Switch to unified view

a b/docs/tutorials/multiple-texts.md
1
# Processing multiple texts
2
3
In the previous tutorials, we've seen how to apply a spaCy NLP pipeline to a single text.
4
Once the pipeline is tested and ready to be applied on an entire corpus, we'll want to deploy it efficiently.
5
6
In this tutorial, we'll cover a few best practices and some _caveats_ to avoid.
7
Then, we'll explore methods that EDS-NLP provides to perform inference on multiple texts.
8
9
Consider this simple pipeline:
10
11
```python
12
import edsnlp, edsnlp.pipes as eds
13
14
nlp = edsnlp.blank("eds")
15
16
nlp.add_pipe(eds.sentences())
17
nlp.add_pipe(eds.normalizer())
18
19
nlp.add_pipe(
20
    eds.matcher(
21
        terms=dict(patient=["patient", "malade"]),
22
        attr="NORM",
23
    ),
24
)
25
26
# Add qualifiers
27
nlp.add_pipe(eds.negation())
28
nlp.add_pipe(eds.hypothesis())
29
nlp.add_pipe(eds.family())
30
31
# Add date detection
32
nlp.add_pipe(eds.dates())
33
```
34
35
Let's deploy it on a large number of documents.
36
37
## What about a `for` loop?
38
39
Suppose we have a corpus of text:
40
41
```python
42
text = (
43
    "Patient admis le 25 septembre 2021 pour suspicion de Covid.\n"
44
    "Pas de cas de coronavirus dans ce service.\n"
45
    "Le père du patient est atteint du covid."
46
)
47
48
corpus = [text] * 10000  # (1)
49
```
50
51
1. This is admittedly ugly. But you get the idea, we have a corpus of 10 000 documents we want to process...
52
53
You _could_ just apply the pipeline document by document.
54
55
```python
56
# ↑ Omitted code above ↑
57
58
docs = [nlp(text) for text in corpus]
59
```
60
61
Next, you might want to convert these documents to a DataFrame for further analysis or storage. You could do this with a loop like this:
62
63
```python
64
import pandas as pd
65
66
rows = []
67
for doc in docs:
68
    for ent in doc.ents:
69
        d = dict(
70
            begin=ent.start_char,
71
            end=ent.end_char,
72
            label=ent.label_,
73
            entity_text=ent.text,
74
            negation=ent._.negation,
75
            hypothesis=ent._.hypothesis,
76
            family=ent._.family,
77
        )
78
        rows.append(d)
79
80
    for date in doc.spans.get("dates", []):
81
        d = dict(
82
            begin=date.start_char,
83
            end=date.end_char,
84
            label="date",
85
            entity_text=date.text,
86
            datetime=date._.date.datetime,
87
        )
88
        rows.append(d)
89
df = pd.DataFrame(rows)
90
```
91
92
There are a few issues with this approach:
93
94
- If our model contains deep learning components (which it does not in this tutorial), we don't benefit from optimized batched matrix operations : ideally, we'd like to process multiple documents at
95
  once.
96
- We may have multiple cores available but we don't use them to apply the pipes of our model to multiple documents at the same time.
97
- We would also like to perform the Doc -> Dict conversion step in parallel and avoid transferring full Doc instances back and forth between processes.
98
- And ideally, being able to switch between input/output formats, or sequential/parallel processing, without changing the code too much.
99
100
## Streams, lazy inference and parallelization
101
102
To efficiently perform the same operations on multiple documents at once, EDS-NLP uses [streams][edsnlp.core.stream.Stream], which record the operations to perform on the
103
documents without actually executing them directly, similar to the way Spark does, or polars with its LazyFrame.
104
105
This allows EDS-NLP to distribute these operations on multiple cores or machines when it is time to execute them. We can configure how the collection operations are run (how many jobs/workers, how many gpus, whether to use the spark engine) via the stream [`set_processing()`][edsnlp.core.stream.Stream.set_processing] method.
106
107
For instance,
108
109
```python
110
docs = edsnlp.data.from_iterable(corpus)
111
print(docs)  # (1)!
112
```
113
114
1. Printed version of the stream:
115
    ```
116
    Stream(
117
      reader=IterableReader(data=<list object at 0x1084532c0>),
118
      ops=[],
119
      writer=None)
120
    ```
121
122
as well as any `edsnlp.data.read_*` or `edsnlp.data.from_*` return a stream, that we can iterate over or complete with more operations. To apply the model on our collection of documents, we
123
can simply do:
124
125
```python
126
docs = docs.map_pipeline(nlp)
127
# or à la spaCy :
128
# docs = nlp.pipe(docs)
129
print(docs)  # (1)!
130
```
131
132
1. Printed version of the stream:
133
    ```
134
    Stream(
135
      reader=IterableReader(data=<list object at 0x1084532c0>),
136
      ops=[
137
        map(_ensure_doc[<edsnlp.core.pipeline.Pipeline object at 0x14f697f80>]),
138
        batchify(size=None, fn=None, sentinel_mode=None),
139
        map_batches_op(<edsnlp.pipes.core.sentences.sentences.SentenceSegmenter object at 0x14ac43ce0>),
140
        map_batches_op(<edsnlp.pipes.core.normalizer.normalizer.Normalizer object at 0x14c5672c0>),
141
        map_batches_op(<edsnlp.pipes.core.matcher.matcher.GenericMatcher object at 0x177013a40>),
142
        map_batches_op(<edsnlp.pipes.qualifiers.negation.negation.NegationQualifier object at 0x16a1d1550>),
143
        map_batches_op(<edsnlp.pipes.qualifiers.hypothesis.hypothesis.HypothesisQualifier object at 0x14ac433b0>),
144
        map_batches_op(<edsnlp.pipes.qualifiers.family.family.FamilyContextQualifier object at 0x14f850da0>),
145
        map_batches_op(<edsnlp.pipes.misc.dates.dates.DatesMatcher object at 0x1767638c0>),
146
        unbatchify()
147
      ],
148
    writer=None)
149
    ```
150
151
??? warning "SpaCy vs EDS-NLP"
152
153
    SpaCy's `nlp.pipe` method is not the same as EDS-NLP's `nlp.pipe` method, and will iterate over anything you pass to it, therefore executing the operations scheduled in our stream.
154
155
    We recommend you instantiate your models using `nlp = edsnlp.blank(...)` or `nlp = edsnlp.load(...)`.
156
157
    Otherwise, use the following to apply a spaCy model on a stream `docs` without triggering its execution:
158
159
    ```{ .python .no-check }
160
    docs = docs.map_pipeline(nlp)
161
    ```
162
163
Finally, we can convert the documents to a DataFrame (or other formats / files) using the `edsnlp.data.to_*` or `edsnlp.data.write_*` methods. This triggers the execution of the operations scheduled
164
in the stream and produces the rows of the DataFrame.
165
166
We can pass our previous Doc to Dict code as a function to the `converter` parameter of the `to_pandas` method. Note that this specific conversion is already implemented in EDS-NLP, so you can just
167
pass the string `"ents"` to the `converter` parameter, and customize the conversion by passing more parameters to the `to_pandas` method, as described [here](/data/converters#ents).
168
169
```python
170
def convert_doc_to_rows(doc):
171
    entities = []
172
173
    for ent in doc.ents:
174
        d = dict(
175
            start=ent.start_char,
176
            end=ent.end_char,
177
            label=ent.label_,
178
            lexical_variant=ent.text,
179
            negation=ent._.negation,
180
            hypothesis=ent._.hypothesis,
181
            family=ent._.family,
182
        )
183
        entities.append(d)
184
185
    for date in doc.spans.get("dates", []):
186
        d = dict(
187
            begin=date.start_char,
188
            end=date.end_char,
189
            label="date",
190
            lexical_variant=date.text,
191
            datetime=date._.date.datetime,
192
        )
193
        entities.append(d)
194
195
    return entities
196
197
198
df = docs.to_pandas(converter=convert_doc_to_rows)
199
# or equivalently:
200
df = docs.to_pandas(
201
    converter="ents",
202
    span_getter=["ents", "dates"],
203
    span_attributes=[
204
        # span._.*** name: column name
205
        "negation",
206
        "hypothesis",
207
        "family",
208
        "date.datetime",
209
    ],
210
)
211
```
212
213
We can also iterate over the documents, which also triggers the execution of the operations scheduled in the stream.
214
215
```python
216
for doc in docs:
217
    # do something with the doc
218
    pass
219
```
220
221
## Processing a DataFrame
222
223
Processing text within a pandas DataFrame is a very common use case. In many applications, you'll select a corpus of documents over a distributed cluster, load it in memory and process all texts.
224
225
!!! note "The OMOP CDM"
226
227
    In every tutorial that mentions distributing EDS-NLP over a corpus of documents,
228
    we will expect the data to be organised using a flavour of the
229
    [OMOP Common Data Model](https://ohdsi.github.io/CommonDataModel/).
230
231
    The OMOP CDM defines two tables of interest to us:
232
233
    - the [`note` table](https://ohdsi.github.io/CommonDataModel/cdm54.html#NOTE) contains the clinical notes
234
    - the [`note_nlp` table](https://ohdsi.github.io/CommonDataModel/cdm54.html#NOTE_NLP) holds the results of
235
      a NLP pipeline applied to the `note` table.
236
237
    We can use the `converter="omop"` argument to the `edsnlp.data` methods to read data in this format.
238
    More information about this converter can be found [here](/data/converters#omop).
239
240
To make sure we can follow along, we propose three recipes for getting the DataFrame: using a dummy dataset like before, loading a CSV or by loading a Spark DataFrame into memory.
241
242
=== "Dummy example"
243
244
    ```python
245
    import pandas as pd
246
247
    text = (
248
        "Patient admis le 25 septembre 2021 pour suspicion de Covid.\n"
249
        "Pas de cas de coronavirus dans ce service.\n"
250
        "Le père du patient est atteint du covid."
251
    )
252
253
    corpus = [text] * 1000
254
255
    data = pd.DataFrame(dict(note_text=corpus))
256
    data["note_id"] = range(len(data))
257
    ```
258
259
=== "Loading data from a CSV"
260
261
    ```{ .python .no-check }
262
    import pandas as pd
263
264
    data = pd.read_csv("note.csv")
265
    ```
266
267
=== "Loading data from a Spark DataFrame"
268
269
    ```{ .python .no-check }
270
    from pyspark.sql.session import SparkSession
271
272
    spark = SparkSession.builder.getOrCreate()
273
274
    df = spark.sql("SELECT * FROM note")
275
    df = df.select("note_id", "note_text")
276
277
    data = df.limit(1000).toPandas()  # (1)
278
    ```
279
280
    1. We limit the size of the DataFrame to make sure we do not overwhelm our machine.
281
282
We'll see in what follows how we can efficiently deploy our pipeline on the `#!python data` object.
283
284
### Locally without parallelization
285
286
```python
287
# Read from a dataframe & use the omop converter
288
docs = edsnlp.data.from_pandas(data, converter="omop")
289
290
# Add the pipeline to operations that will be run
291
docs = docs.map_pipeline(nlp)
292
293
# Convert each doc to a list of dicts (one by entity)
294
# and store the result in a pandas DataFrame
295
note_nlp = docs.to_pandas(
296
    converter="ents",
297
    # Below are the arguments to the converter
298
    span_getter=["ents", "dates"],
299
    span_attributes=[  # (1)
300
        # span._.*** name: column name
301
        "negation",
302
        "hypothesis",
303
        "family",
304
        "date.datetime",
305
        # having individual columns for each date part
306
        # can be useful for incomplete dates (eg, "in May")
307
        "date.day",
308
        "date.month",
309
        "date.year",
310
    ],
311
)
312
```
313
314
1. You can just pass a dict if you want to explicitely rename the attributes.
315
316
The result on the first note:
317
318
| note_id | start | end | label      | lexical_variant   | negation | hypothesis | family | key   |
319
|--------:|------:|----:|:-----------|:------------------|---------:|-----------:|-------:|:------|
320
|       0 |     0 |   7 | patient    | Patient           |        0 |          0 |      0 | ents  |
321
|       0 |   114 | 121 | patient    | patient           |        0 |          0 |      1 | ents  |
322
|       0 |    17 |  34 | 2021-09-25 | 25 septembre 2021 |      nan |        nan |    nan | dates |
323
324
### Locally, using multiple parallel workers
325
326
```{ .python hl_lines="8" }
327
# Read from a dataframe & use the omop converter
328
docs = edsnlp.data.from_pandas(data, converter="omop")
329
330
# Add the pipeline to operations that will be run
331
docs = docs.map_pipeline(nlp)
332
333
# The operations of our stream will be distributed on multiple workers
334
docs = docs.set_processing(backend="multiprocessing")
335
336
# Convert each doc to a list of dicts (one by entity)
337
# and store the result in a pandas DataFrame
338
note_nlp = docs.to_pandas(
339
    converter="ents",
340
    span_getter=["ents", "dates"],
341
    span_attributes=[
342
        "negation",
343
        "hypothesis",
344
        "family",
345
        "date.datetime",
346
347
        # having individual columns for each date part
348
        # can be useful for incomplete dates (eg, "in May")
349
        "date.day",
350
        "date.month",
351
        "date.year",
352
    ],
353
)
354
```
355
356
!!! note "Deterministic processing"
357
358
    By default, from version 0.14.0, EDS-NLP dispatches tasks to workers in a round-robin fashion to ensure deterministic processing. This mechanism can be disabled to send documents to workers as soon as they are available, which may result in faster processing but out-of-order results.
359
360
    To disable processing determinism, use `set_processing(deterministic=False)`. Note that this parameter is only used when using the `multiprocessing` backend.
361
362
### In a distributed fashion with spark
363
364
To use the Spark engine to distribute the computation, we create our stream from the Spark dataframe directly and write the result to a new Spark dataframe. EDS-NLP will automatically
365
distribute the operations on the cluster (setting `backend="spark"` behind the scenes), but you can change the backend (for instance to `multiprocessing` to run locally).
366
367
!!! warning "Spark backend"
368
369
    When processing from AND to a Spark DataFrame, the backend is automatically set to "spark".
370
371
    We do NOT recommend using other backend when Spark dataframe are involved, as there may be a discrepancy between the time it takes to process the data locally and the timeout of the spark job.
372
373
```{ .python hl_lines="2 12" .no-check }
374
# Read from the pyspark dataframe & use the omop converter
375
docs = edsnlp.data.from_spark(df, converter="omop")
376
377
# Add the pipeline to operations that will be run
378
docs = docs.map_pipeline(nlp)
379
380
# Backend is set by default to "spark"
381
# docs = docs.set_processing(backend="spark")
382
383
# Convert each doc to a list of dicts (one by entity)
384
# and store the result in a pyspark DataFrame
385
note_nlp = docs.to_spark(
386
    converter="ents",
387
    span_getter=["ents", "dates"],
388
    span_attributes=[
389
        "negation",
390
        "hypothesis",
391
        "family",
392
        "date.datetime",
393
394
        # having individual columns for each date part
395
        # can be useful for incomplete dates (eg, "in May")
396
        "date.day",
397
        "date.month",
398
        "date.year",
399
    ],
400
    dtypes=None,  # (1)
401
)
402
```
403
404
1. If you don't pass a `dtypes` argument, EDS-NLP will print the inferred schema it such that you can copy-paste it in your code.