a b/docs/data/spark.md
1
# Spark
2
3
??? abstract "TLDR"
4
5
    ```{ .python .no-check }
6
    import edsnlp
7
8
    stream = edsnlp.data.from_spark(df, converter="omop")
9
    stream = stream.map_pipeline(nlp)
10
    res = stream.to_spark(converter="omop")
11
    # or equivalently
12
    edsnlp.data.to_spark(stream, converter="omop")
13
    ```
14
15
We provide methods to read and write documents (raw or annotated) from and to Spark DataFrames.
16
17
As an example, imagine that we have the following OMOP dataframe (we'll name it `note_df`)
18
19
| note_id | note_text                                     | note_datetime |
20
|--------:|:----------------------------------------------|:--------------|
21
|       0 | Le patient est admis pour une pneumopathie... | 2021-10-23    |
22
23
## Reading from a Spark Dataframe {: #edsnlp.data.spark.from_spark }
24
25
::: edsnlp.data.spark.from_spark
26
    options:
27
        heading_level: 3
28
        show_source: false
29
        show_toc: false
30
        show_bases: false
31
32
## Writing to a Spark DataFrame {: #edsnlp.data.spark.to_spark }
33
34
::: edsnlp.data.spark.to_spark
35
    options:
36
        heading_level: 3
37
        show_source: false
38
        show_toc: false
39
        show_bases: false
40
41
## Importing entities from a Spark DataFrame
42
43
If you have a dataframe with entities (e.g., `note_nlp` in OMOP), you must join it with the dataframe containing the raw text (e.g., `note` in OMOP) to obtain a single dataframe with the entities next to the raw text. For instance, the second `note_nlp` dataframe that we will name `note_nlp`.
44
45
| note_nlp_id | note_id | start_char | end_char | note_nlp_source_value | lexical_variant |
46
|------------:|--------:|-----------:|---------:|:----------------------|:----------------|
47
|           0 |       0 |         46 |       57 | disease               | coronavirus     |
48
|           1 |       0 |         77 |       88 | drug                  | paracétamol     |
49
50
```{ .python .no-check }
51
import pyspark.sql.functions as F
52
53
df = note_df.join(
54
    note_nlp_df
55
    .groupBy("note_id")
56
    .agg(
57
        F.collect_list(
58
            F.struct(
59
                F.col("note_nlp_id"),
60
                F.col("start_char"),
61
                F.col("end_char"),
62
                F.col("note_nlp_source_value")
63
            )
64
        ).alias("entities")
65
    ), "note_id", "left")
66
```