|
a |
|
b/tests/data/test_parquet.py |
|
|
1 |
from itertools import islice |
|
|
2 |
from pathlib import Path |
|
|
3 |
|
|
|
4 |
import pyarrow.dataset |
|
|
5 |
import pyarrow.fs |
|
|
6 |
import pytest |
|
|
7 |
from confit.utils.random import set_seed |
|
|
8 |
from typing_extensions import Literal |
|
|
9 |
|
|
|
10 |
import edsnlp |
|
|
11 |
from edsnlp.data.converters import get_dict2doc_converter, get_doc2dict_converter |
|
|
12 |
from edsnlp.utils.collections import dl_to_ld |
|
|
13 |
|
|
|
14 |
|
|
|
15 |
def assert_doc_read(doc): |
|
|
16 |
assert doc._.note_id == "subfolder/doc-1" |
|
|
17 |
assert doc._.context_var == "test" |
|
|
18 |
|
|
|
19 |
attrs = ("etat", "assertion") |
|
|
20 |
spans_and_attributes = { |
|
|
21 |
"__ents__": sorted( |
|
|
22 |
[ |
|
|
23 |
(e.start, e.end, e.text, tuple(getattr(e._, key) for key in attrs)) |
|
|
24 |
for e in doc.ents |
|
|
25 |
] |
|
|
26 |
), |
|
|
27 |
**{ |
|
|
28 |
name: sorted( |
|
|
29 |
[ |
|
|
30 |
(e.start, e.end, e.text, tuple(getattr(e._, key) for key in attrs)) |
|
|
31 |
for e in doc.spans[name] |
|
|
32 |
] |
|
|
33 |
) |
|
|
34 |
for name in doc.spans |
|
|
35 |
}, |
|
|
36 |
} |
|
|
37 |
|
|
|
38 |
assert spans_and_attributes == { |
|
|
39 |
"__ents__": [ |
|
|
40 |
(6, 7, "douleurs", (None, None)), |
|
|
41 |
(7, 11, "dans le bras droit", (None, None)), |
|
|
42 |
(17, 21, "problème \nde locomotion", (None, "absent")), |
|
|
43 |
(25, 26, "AVC", ("passé", "non-associé")), |
|
|
44 |
(35, 36, "rhume", ("présent", "hypothétique")), |
|
|
45 |
(45, 46, "rhume", ("présent", "hypothétique")), |
|
|
46 |
(51, 52, "Douleurs", (None, None)), |
|
|
47 |
(52, 56, "dans le bras droit", (None, None)), |
|
|
48 |
(68, 69, "anomalie", (None, "absent")), |
|
|
49 |
], |
|
|
50 |
"anatomie": [ |
|
|
51 |
(9, 11, "bras droit", (None, None)), |
|
|
52 |
(54, 56, "bras droit", (None, None)), |
|
|
53 |
], |
|
|
54 |
"localisation": [ |
|
|
55 |
(7, 11, "dans le bras droit", (None, None)), |
|
|
56 |
(52, 56, "dans le bras droit", (None, None)), |
|
|
57 |
], |
|
|
58 |
"pathologie": [ |
|
|
59 |
(17, 21, "problème \nde locomotion", (None, "absent")), |
|
|
60 |
(25, 26, "AVC", ("passé", "non-associé")), |
|
|
61 |
(35, 36, "rhume", ("présent", "hypothétique")), |
|
|
62 |
(45, 46, "rhume", ("présent", "hypothétique")), |
|
|
63 |
], |
|
|
64 |
"sosy": [ |
|
|
65 |
(6, 7, "douleurs", (None, None)), |
|
|
66 |
(51, 52, "Douleurs", (None, None)), |
|
|
67 |
(68, 69, "anomalie", (None, "absent")), |
|
|
68 |
], |
|
|
69 |
} |
|
|
70 |
|
|
|
71 |
|
|
|
72 |
GOLD_OMOP = { |
|
|
73 |
"entities": [ |
|
|
74 |
{ |
|
|
75 |
"assertion": None, |
|
|
76 |
"end_char": 38, |
|
|
77 |
"etat": "test", |
|
|
78 |
"lexical_variant": "douleurs", |
|
|
79 |
"note_nlp_id": 0, |
|
|
80 |
"note_nlp_source_value": "sosy", |
|
|
81 |
"start_char": 30, |
|
|
82 |
}, |
|
|
83 |
{ |
|
|
84 |
"assertion": None, |
|
|
85 |
"end_char": 57, |
|
|
86 |
"etat": None, |
|
|
87 |
"lexical_variant": "dans le bras droit", |
|
|
88 |
"note_nlp_id": 1, |
|
|
89 |
"note_nlp_source_value": "localisation", |
|
|
90 |
"start_char": 39, |
|
|
91 |
}, |
|
|
92 |
{ |
|
|
93 |
"assertion": None, |
|
|
94 |
"end_char": 57, |
|
|
95 |
"etat": None, |
|
|
96 |
"lexical_variant": "bras droit", |
|
|
97 |
"note_nlp_id": 2, |
|
|
98 |
"note_nlp_source_value": "anatomie", |
|
|
99 |
"start_char": 47, |
|
|
100 |
}, |
|
|
101 |
{ |
|
|
102 |
"assertion": "absent", |
|
|
103 |
"end_char": 98, |
|
|
104 |
"etat": None, |
|
|
105 |
"lexical_variant": "problème \nde locomotion", |
|
|
106 |
"note_nlp_id": 3, |
|
|
107 |
"note_nlp_source_value": "pathologie", |
|
|
108 |
"start_char": 75, |
|
|
109 |
}, |
|
|
110 |
{ |
|
|
111 |
"assertion": "non-associé", |
|
|
112 |
"end_char": 117, |
|
|
113 |
"etat": "passé", |
|
|
114 |
"lexical_variant": "AVC", |
|
|
115 |
"note_nlp_id": 4, |
|
|
116 |
"note_nlp_source_value": "pathologie", |
|
|
117 |
"start_char": 114, |
|
|
118 |
}, |
|
|
119 |
{ |
|
|
120 |
"assertion": "hypothétique", |
|
|
121 |
"end_char": 164, |
|
|
122 |
"etat": "présent", |
|
|
123 |
"lexical_variant": "rhume", |
|
|
124 |
"note_nlp_id": 5, |
|
|
125 |
"note_nlp_source_value": "pathologie", |
|
|
126 |
"start_char": 159, |
|
|
127 |
}, |
|
|
128 |
{ |
|
|
129 |
"assertion": "hypothétique", |
|
|
130 |
"end_char": 296, |
|
|
131 |
"etat": "présent", |
|
|
132 |
"lexical_variant": "rhume", |
|
|
133 |
"note_nlp_id": 6, |
|
|
134 |
"note_nlp_source_value": "pathologie", |
|
|
135 |
"start_char": 291, |
|
|
136 |
}, |
|
|
137 |
{ |
|
|
138 |
"assertion": None, |
|
|
139 |
"end_char": 314, |
|
|
140 |
"etat": None, |
|
|
141 |
"lexical_variant": "Douleurs", |
|
|
142 |
"note_nlp_id": 7, |
|
|
143 |
"note_nlp_source_value": "sosy", |
|
|
144 |
"start_char": 306, |
|
|
145 |
}, |
|
|
146 |
{ |
|
|
147 |
"assertion": None, |
|
|
148 |
"end_char": 333, |
|
|
149 |
"etat": None, |
|
|
150 |
"lexical_variant": "dans le bras droit", |
|
|
151 |
"note_nlp_id": 8, |
|
|
152 |
"note_nlp_source_value": "localisation", |
|
|
153 |
"start_char": 315, |
|
|
154 |
}, |
|
|
155 |
{ |
|
|
156 |
"assertion": None, |
|
|
157 |
"end_char": 333, |
|
|
158 |
"etat": None, |
|
|
159 |
"lexical_variant": "bras droit", |
|
|
160 |
"note_nlp_id": 9, |
|
|
161 |
"note_nlp_source_value": "anatomie", |
|
|
162 |
"start_char": 323, |
|
|
163 |
}, |
|
|
164 |
{ |
|
|
165 |
"assertion": "absent", |
|
|
166 |
"end_char": 386, |
|
|
167 |
"etat": None, |
|
|
168 |
"lexical_variant": "anomalie", |
|
|
169 |
"note_nlp_id": 10, |
|
|
170 |
"note_nlp_source_value": "sosy", |
|
|
171 |
"start_char": 378, |
|
|
172 |
}, |
|
|
173 |
], |
|
|
174 |
"note_id": "subfolder/doc-1", |
|
|
175 |
"context_var": "test", |
|
|
176 |
"note_text": "Le patient est admis pour des douleurs dans le bras droit, mais " |
|
|
177 |
"n'a pas de problème \n" |
|
|
178 |
"de locomotion. \n" |
|
|
179 |
"Historique d'AVC dans la famille. pourrait être un cas de " |
|
|
180 |
"rhume.\n" |
|
|
181 |
"NBNbWbWbNbWbNBNbNbWbWbNBNbWbNbNbWbNBNbWbNbNBWbWbNbNbNBWbNbWbNbWBNbNbWbNbNBNbWb" |
|
|
182 |
"WbNbWBNbNbWbNBNbWbWbNb\n" |
|
|
183 |
"Pourrait être un cas de rhume.\n" |
|
|
184 |
"Motif :\n" |
|
|
185 |
"Douleurs dans le bras droit.\n" |
|
|
186 |
"ANTÉCÉDENTS\n" |
|
|
187 |
"Le patient est déjà venu\n" |
|
|
188 |
"Pas d'anomalie détectée.\n", |
|
|
189 |
} |
|
|
190 |
|
|
|
191 |
|
|
|
192 |
def assert_doc_write_omop(exported_obj): |
|
|
193 |
assert exported_obj == GOLD_OMOP |
|
|
194 |
|
|
|
195 |
|
|
|
196 |
def assert_doc_write_ents(exported_objs): |
|
|
197 |
in_converter, kwargs = get_dict2doc_converter( |
|
|
198 |
"omop", |
|
|
199 |
dict( |
|
|
200 |
span_attributes=["etat", "assertion"], |
|
|
201 |
doc_attributes=["context_var"], |
|
|
202 |
), |
|
|
203 |
) |
|
|
204 |
doc = in_converter(GOLD_OMOP, **kwargs) |
|
|
205 |
out_converter, kwargs = get_doc2dict_converter( |
|
|
206 |
"ents", |
|
|
207 |
dict( |
|
|
208 |
span_attributes=["etat", "assertion"], |
|
|
209 |
doc_attributes=["context_var"], |
|
|
210 |
span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], |
|
|
211 |
), |
|
|
212 |
) |
|
|
213 |
GOLD_ENTS = out_converter(doc, **kwargs) |
|
|
214 |
assert exported_objs == GOLD_ENTS |
|
|
215 |
|
|
|
216 |
|
|
|
217 |
def test_read_write_in_worker(blank_nlp, tmpdir): |
|
|
218 |
input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet" |
|
|
219 |
output_dir = Path(tmpdir) |
|
|
220 |
edsnlp.data.read_parquet( |
|
|
221 |
input_dir, |
|
|
222 |
converter="omop", |
|
|
223 |
span_attributes=["etat", "assertion"], |
|
|
224 |
doc_attributes=["context_var"], |
|
|
225 |
read_in_worker=True, |
|
|
226 |
).write_parquet( |
|
|
227 |
output_dir / "docs.parquet", |
|
|
228 |
converter="omop", |
|
|
229 |
doc_attributes=["context_var"], |
|
|
230 |
span_attributes=["etat", "assertion"], |
|
|
231 |
span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], |
|
|
232 |
write_in_worker=True, |
|
|
233 |
) |
|
|
234 |
# fmt: off |
|
|
235 |
assert ( |
|
|
236 |
list(dl_to_ld(pyarrow.dataset.dataset(output_dir / "docs.parquet").to_table().to_pydict())) # noqa: E501 |
|
|
237 |
== list(dl_to_ld(pyarrow.dataset.dataset(input_dir).to_table().to_pydict())) |
|
|
238 |
) |
|
|
239 |
# fmt: on |
|
|
240 |
|
|
|
241 |
|
|
|
242 |
def test_read_to_parquet(blank_nlp, tmpdir, run_in_test_dir): |
|
|
243 |
output_dir = Path(tmpdir) |
|
|
244 |
fs = pyarrow.fs.LocalFileSystem() |
|
|
245 |
doc = list( |
|
|
246 |
edsnlp.data.read_parquet( |
|
|
247 |
"../resources/docs.parquet", |
|
|
248 |
converter="omop", |
|
|
249 |
span_attributes=["etat", "assertion"], |
|
|
250 |
doc_attributes=["context_var"], |
|
|
251 |
filesystem=fs, |
|
|
252 |
) |
|
|
253 |
)[0] |
|
|
254 |
assert_doc_read(doc) |
|
|
255 |
doc.ents[0]._.etat = "test" |
|
|
256 |
|
|
|
257 |
edsnlp.data.write_parquet( |
|
|
258 |
[doc], |
|
|
259 |
output_dir, |
|
|
260 |
converter="omop", |
|
|
261 |
doc_attributes=["context_var"], |
|
|
262 |
span_attributes=["etat", "assertion"], |
|
|
263 |
span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], |
|
|
264 |
) |
|
|
265 |
|
|
|
266 |
assert_doc_write_omop( |
|
|
267 |
next(dl_to_ld(pyarrow.dataset.dataset(output_dir).to_table().to_pydict())) |
|
|
268 |
) |
|
|
269 |
|
|
|
270 |
with pytest.raises(FileExistsError): |
|
|
271 |
edsnlp.data.write_parquet( |
|
|
272 |
[doc], |
|
|
273 |
output_dir, |
|
|
274 |
converter="omop", |
|
|
275 |
doc_attributes=["context_var"], |
|
|
276 |
span_attributes=["etat", "assertion"], |
|
|
277 |
span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], |
|
|
278 |
) |
|
|
279 |
|
|
|
280 |
edsnlp.data.write_parquet( |
|
|
281 |
[doc], |
|
|
282 |
output_dir, |
|
|
283 |
converter="omop", |
|
|
284 |
doc_attributes=["context_var"], |
|
|
285 |
span_attributes=["etat", "assertion"], |
|
|
286 |
span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], |
|
|
287 |
overwrite=True, |
|
|
288 |
) |
|
|
289 |
|
|
|
290 |
|
|
|
291 |
def test_read_to_parquet_ents(blank_nlp, tmpdir): |
|
|
292 |
input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet" |
|
|
293 |
output_dir = Path(tmpdir) |
|
|
294 |
fs = pyarrow.fs.LocalFileSystem() |
|
|
295 |
doc = list( |
|
|
296 |
edsnlp.data.read_parquet( |
|
|
297 |
input_dir, |
|
|
298 |
converter="omop", |
|
|
299 |
span_attributes=["etat", "assertion"], |
|
|
300 |
doc_attributes=["context_var"], |
|
|
301 |
filesystem=fs, |
|
|
302 |
) |
|
|
303 |
)[0] |
|
|
304 |
assert_doc_read(doc) |
|
|
305 |
doc.ents[0]._.etat = "test" |
|
|
306 |
|
|
|
307 |
edsnlp.data.write_parquet( |
|
|
308 |
[doc], |
|
|
309 |
output_dir, |
|
|
310 |
converter="ents", |
|
|
311 |
doc_attributes=["context_var"], |
|
|
312 |
span_attributes=["etat", "assertion"], |
|
|
313 |
span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], |
|
|
314 |
num_rows_per_file=1024, # deprecated but test for backward compatibility |
|
|
315 |
) |
|
|
316 |
|
|
|
317 |
assert_doc_write_ents( |
|
|
318 |
list(dl_to_ld(pyarrow.dataset.dataset(output_dir).to_table().to_pydict())) |
|
|
319 |
) |
|
|
320 |
|
|
|
321 |
with pytest.raises(FileExistsError): |
|
|
322 |
edsnlp.data.write_parquet( |
|
|
323 |
[doc], |
|
|
324 |
output_dir, |
|
|
325 |
converter="ents", |
|
|
326 |
doc_attributes=["context_var"], |
|
|
327 |
span_attributes=["etat", "assertion"], |
|
|
328 |
span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], |
|
|
329 |
) |
|
|
330 |
|
|
|
331 |
|
|
|
332 |
@pytest.mark.parametrize("num_cpu_workers", [0, 2]) |
|
|
333 |
@pytest.mark.parametrize("shuffle", ["dataset", "fragment"]) |
|
|
334 |
@pytest.mark.parametrize("shuffle_reader", [False, None]) |
|
|
335 |
def test_read_shuffle_loop( |
|
|
336 |
num_cpu_workers: int, |
|
|
337 |
shuffle: Literal["dataset", "fragment"], |
|
|
338 |
shuffle_reader: bool, |
|
|
339 |
): |
|
|
340 |
input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet" |
|
|
341 |
notes = ( |
|
|
342 |
edsnlp.data.read_parquet(input_dir, loop=True) |
|
|
343 |
.shuffle(batch_by=shuffle, seed=42, shuffle_reader=shuffle_reader) |
|
|
344 |
.map(lambda x: x["note_id"]) |
|
|
345 |
.set_processing(num_cpu_workers=num_cpu_workers) |
|
|
346 |
) |
|
|
347 |
notes = list(islice(notes, 6)) |
|
|
348 |
if not (num_cpu_workers > 1 and not shuffle_reader): |
|
|
349 |
assert notes == [ |
|
|
350 |
"subfolder/doc-2", |
|
|
351 |
"subfolder/doc-1", |
|
|
352 |
"subfolder/doc-3", |
|
|
353 |
"subfolder/doc-3", |
|
|
354 |
"subfolder/doc-2", |
|
|
355 |
"subfolder/doc-1", |
|
|
356 |
] |
|
|
357 |
|
|
|
358 |
|
|
|
359 |
@pytest.mark.parametrize("num_cpu_workers", [0, 2]) |
|
|
360 |
@pytest.mark.parametrize("work_unit", ["record", "fragment"]) |
|
|
361 |
@pytest.mark.parametrize("shuffle", [False, "dataset", "fragment"]) |
|
|
362 |
def test_read_work_unit( |
|
|
363 |
num_cpu_workers, |
|
|
364 |
work_unit: Literal["record", "fragment"], |
|
|
365 |
shuffle: Literal[False, "dataset", "fragment"], |
|
|
366 |
): |
|
|
367 |
if shuffle == "dataset" and work_unit == "fragment": |
|
|
368 |
pytest.skip("Dataset-level shuffle is not supported with fragment work unit") |
|
|
369 |
input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet" |
|
|
370 |
set_seed(42) |
|
|
371 |
stream = edsnlp.data.read_parquet( |
|
|
372 |
input_dir, work_unit=work_unit, shuffle=shuffle |
|
|
373 |
).set_processing( |
|
|
374 |
num_cpu_workers=num_cpu_workers, |
|
|
375 |
) |
|
|
376 |
stream = stream.map_batches( |
|
|
377 |
lambda b: "|".join(sorted([x["note_id"] for x in b])), batch_size=1000 |
|
|
378 |
) |
|
|
379 |
if work_unit == "fragment" and num_cpu_workers == 2 or num_cpu_workers == 0: |
|
|
380 |
assert list(stream) == ["subfolder/doc-1|subfolder/doc-2|subfolder/doc-3"] |
|
|
381 |
else: |
|
|
382 |
assert list(stream) == ["subfolder/doc-1|subfolder/doc-3", "subfolder/doc-2"] |
|
|
383 |
|
|
|
384 |
|
|
|
385 |
@pytest.mark.parametrize( |
|
|
386 |
"num_cpu_workers,write_in_worker", |
|
|
387 |
[ |
|
|
388 |
(0, False), |
|
|
389 |
(2, True), |
|
|
390 |
(2, False), |
|
|
391 |
], |
|
|
392 |
) |
|
|
393 |
def test_write_parquet_fragment(tmpdir, num_cpu_workers, write_in_worker): |
|
|
394 |
input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet" |
|
|
395 |
output_dir = Path(tmpdir) |
|
|
396 |
notes = edsnlp.data.read_parquet(input_dir, converter="omop") |
|
|
397 |
notes = notes.map_batches( |
|
|
398 |
lambda b: [y for x in b for y in (x,) * (4 if "doc-2" in x._.note_id else 1)] |
|
|
399 |
) |
|
|
400 |
notes = notes.set_processing( |
|
|
401 |
num_cpu_workers=num_cpu_workers, |
|
|
402 |
deterministic=True, # by default |
|
|
403 |
) |
|
|
404 |
notes.write_parquet( |
|
|
405 |
output_dir, |
|
|
406 |
batch_size="fragment", |
|
|
407 |
converter=lambda x: {"note_id": x._.note_id}, |
|
|
408 |
write_in_worker=write_in_worker, |
|
|
409 |
) |
|
|
410 |
input_ds = pyarrow.dataset.dataset(input_dir) |
|
|
411 |
inputs = [o["note_id"] for o in dl_to_ld(input_ds.to_table().to_pydict())] |
|
|
412 |
assert len(list(input_ds.get_fragments())) == 1 |
|
|
413 |
assert inputs == ["subfolder/doc-1", "subfolder/doc-2", "subfolder/doc-3"] |
|
|
414 |
out_ds = pyarrow.dataset.dataset(output_dir) |
|
|
415 |
outs = [o["note_id"] for o in dl_to_ld(out_ds.to_table().to_pydict())] |
|
|
416 |
if write_in_worker and num_cpu_workers == 2: |
|
|
417 |
# Depending on the order in which the 2 workers produces the batches |
|
|
418 |
assert len(list(out_ds.get_fragments())) == 2 |
|
|
419 |
assert outs == [ |
|
|
420 |
"subfolder/doc-1", |
|
|
421 |
"subfolder/doc-3", |
|
|
422 |
"subfolder/doc-2", |
|
|
423 |
"subfolder/doc-2", |
|
|
424 |
"subfolder/doc-2", |
|
|
425 |
"subfolder/doc-2", |
|
|
426 |
] or outs == [ |
|
|
427 |
"subfolder/doc-2", |
|
|
428 |
"subfolder/doc-2", |
|
|
429 |
"subfolder/doc-2", |
|
|
430 |
"subfolder/doc-2", |
|
|
431 |
"subfolder/doc-1", |
|
|
432 |
"subfolder/doc-3", |
|
|
433 |
] |
|
|
434 |
elif not write_in_worker and num_cpu_workers == 2: |
|
|
435 |
assert len(list(out_ds.get_fragments())) == 1 |
|
|
436 |
assert outs == [ |
|
|
437 |
"subfolder/doc-1", |
|
|
438 |
"subfolder/doc-2", |
|
|
439 |
"subfolder/doc-3", |
|
|
440 |
"subfolder/doc-2", |
|
|
441 |
"subfolder/doc-2", |
|
|
442 |
"subfolder/doc-2", |
|
|
443 |
] |
|
|
444 |
else: # simple case |
|
|
445 |
assert len(list(out_ds.get_fragments())) == 1 |
|
|
446 |
assert outs == [ |
|
|
447 |
"subfolder/doc-1", |
|
|
448 |
"subfolder/doc-2", |
|
|
449 |
"subfolder/doc-2", |
|
|
450 |
"subfolder/doc-2", |
|
|
451 |
"subfolder/doc-2", |
|
|
452 |
"subfolder/doc-3", |
|
|
453 |
] |