Diff of /pathaia/util/convert.py [000000] .. [7823dd]

Switch to unified view

a b/pathaia/util/convert.py
1
"""
2
A Module to translate pathaia csv to json annotation for micromap.
3
4
It uses networkx to handle graph structures.
5
"""
6
import json
7
import pandas as pd
8
import networkx as nx
9
import openslide
10
import os
11
from .types import PathLike
12
from typing import Dict, Iterable, Tuple
13
import warnings
14
15
16
class Error(Exception):
17
    """
18
    Base of custom errors.
19
20
    **********************
21
    """
22
23
    pass
24
25
26
class BottomLeftNotFound(Error):
27
    """
28
    Raise when trying to access unknown level.
29
30
    *********************************************
31
    """
32
33
    pass
34
35
36
class NextPointNotFound(Error):
37
    """
38
    Raise when trying to access unknown level.
39
40
    *********************************************
41
    """
42
43
    pass
44
45
46
class NoPathFound(Error):
47
    """
48
    Raise when trying to access unknown level.
49
50
    *********************************************
51
    """
52
53
    pass
54
55
56
class OutOfBound(Error):
57
    """
58
    Raise when trying to access unknown level.
59
60
    *********************************************
61
    """
62
63
    pass
64
65
66
colorCycle = [
67
    "#f44336", "#8bc34a", "#ffeb3b", "#673ab7", "#e91e63", "#cddc39", "#9c27b0",
68
    "#ffc107", "#3f51b5", "#ff9800", "#2196f3", "#ff5722", "#03a9f4", "#795548",
69
    "#00bcd4", "#607d8b", "#009688", "#4caf50"
70
]
71
72
73
def handle_predicted_patches(
74
    patch_file: PathLike,
75
    level: int,
76
    column: str
77
):
78
    """
79
    Read a patch file.
80
81
    Read lines of the patch csv looking for 'column' label.
82
    Args:
83
        patch_file (str): absolute path to a csv patch file.
84
        level (int): pyramid level to query patches in the csv.
85
        column (str): header of the column to use to label individual patches.
86
    Yields:
87
        tuple: position and label of patches (x, y, label).
88
89
    """
90
    df = pd.read_csv(patch_file)
91
    level_df = df[df["level"] == level]
92
    for _, row in level_df.iterrows():
93
        yield row["x"], row["y"], row["dx"], row[column], column
94
95
96
def get_category(
97
    val: float,
98
    thresholds: Dict[int, Tuple[float, float]]
99
):
100
    """
101
    Return the break-apart categorical label from estimation.
102
103
    *********************************************************
104
    """
105
    for label, bounds in thresholds.items():
106
        low, high = bounds
107
        if val >= low and val < high:
108
            return label
109
    raise OutOfBound(
110
        "Value: {} is out of bounds for thresholds: {}!".format(val, thresholds)
111
    )
112
113
114
def gen_categorical_from_floatpreds(
115
    patch_file: PathLike,
116
    level: int,
117
    column: str,
118
    thresholds: Dict[int, Tuple[float, float]]
119
):
120
    """
121
    Yield categorical patches from float predictions.
122
123
    *************************************************
124
    """
125
    for patch in handle_predicted_patches(
126
        patch_file, level, column
127
    ):
128
        x, y, d, val, author = patch
129
        try:
130
            yield x, y, d, get_category(val, thresholds), author
131
        except OutOfBound:
132
            pass
133
134
135
def get_categorical_layer_edges(
136
    categorical_patch_generator: Iterable,
137
    color_dict: Dict[int, str],
138
    classname_dict: Dict[int, str]
139
):
140
    """
141
    Create graph features from patches for every layer of annotation.
142
143
    *******************************************************************
144
    """
145
    layer_nodes = dict()
146
    layer_edges = dict()
147
    layer_meta = dict()
148
    interval = None
149
    for patch in categorical_patch_generator:
150
        x, y, d, cl, author = patch
151
        if interval is None:
152
            interval = d
153
        cl_name = classname_dict[cl]
154
        if cl_name not in layer_nodes:
155
            layer_nodes[cl_name] = set()
156
            layer_meta[cl_name] = {
157
                "label": "{}_{}".format(author, cl_name),
158
                "author": "{}".format(author),
159
                "text": "",
160
                "color": color_dict[cl],
161
                "date": ""
162
            }
163
        # (x, y) is just the top left corner, to plot the polygon,
164
        # we will need the four corners
165
        layer_nodes[cl_name].add((x, y))
166
        layer_nodes[cl_name].add((x + interval, y))
167
        layer_nodes[cl_name].add((x, y + interval))
168
        layer_nodes[cl_name].add((x + interval, y + interval))
169
    for layer, nodes in layer_nodes.items():
170
        layer_edges[layer] = set()
171
        for node in nodes:
172
            x, y = node
173
            for neighbor in [
174
                (x + interval, y),
175
                (x - interval, y),
176
                (x, y + interval),
177
                (x, y - interval),
178
                (x - interval, y - interval),
179
                (x - interval, y + interval),
180
                (x + interval, y + interval),
181
                (x + interval, y - interval)
182
            ]:
183
                if neighbor in nodes:
184
                    layer_edges[layer].add((node, neighbor))
185
    return layer_edges, layer_meta, interval
186
187
188
def get_categorical_segments_from_edges(layer_edges: Dict):
189
    """
190
    Create segments from layer edges.
191
192
    *********************************
193
    """
194
    layer_segments = dict()
195
    for layer, edges in layer_edges.items():
196
        layer_segments[layer] = []
197
        # create a graph
198
        layer_graph = nx.Graph()
199
        layer_graph.add_edges_from(edges)
200
        for c in nx.connected_components(layer_graph):
201
            layer_segments[layer].append(layer_graph.subgraph(c).copy())
202
    return layer_segments
203
204
205
def get_contour_points(segment, adj=8):
206
    """
207
    Find contour points of a segment.
208
209
    *********************************
210
    """
211
    contour = []
212
    for pt in segment.nodes:
213
        if segment.degree[pt] < adj:
214
            contour.append(pt)
215
    return contour
216
217
218
def convert_coord(coord, slide_dims):
219
    """
220
    Compute relative coords from abs.
221
222
    *********************************
223
    """
224
    sx, sy = slide_dims
225
    x, y = coord
226
    return float(x) / sx, float(y) / sy
227
228
229
def find_bottom_left(pts):
230
    """
231
    Find bottom left point.
232
233
    ***********************
234
    """
235
    ymax = max([pt[1] for pt in pts])
236
    xmin = 100000000000
237
    bl = None
238
    for pt in pts:
239
        x, y = pt
240
        if y == ymax:
241
            if x <= xmin:
242
                bl = x, y
243
    if bl is not None:
244
        return bl
245
    raise BottomLeftNotFound("Did not find bottom left point of the cloud !!!")
246
247
248
def turn_left(orientation):
249
    """
250
    Compute a new orientation after turning on the left.
251
252
    **************************************************
253
    """
254
    new_orientation = dict()
255
    new_orientation["front"] = orientation["left"]
256
    new_orientation["left"] = orientation["back"]
257
    new_orientation["back"] = orientation["right"]
258
    new_orientation["right"] = orientation["front"]
259
    return new_orientation
260
261
262
def turn_right(orientation):
263
    """
264
    Compute a new orientation after turning on the right.
265
266
    *****************************************************
267
    """
268
    new_orientation = dict()
269
    new_orientation["front"] = orientation["right"]
270
    new_orientation["left"] = orientation["front"]
271
    new_orientation["back"] = orientation["left"]
272
    new_orientation["right"] = orientation["back"]
273
    return new_orientation
274
275
276
def turn_back(orientation):
277
    """
278
    Compute a new orientation after turning back.
279
280
    *********************************************
281
    """
282
    # basically, it's just 'turn_right' twice...
283
    new_orientation = turn_right(orientation)
284
    new_new_orientation = turn_right(new_orientation)
285
    return new_new_orientation
286
287
288
def go_to_next_point(pt, orientation, perimeter):
289
    """
290
    Compute next point in the path.
291
292
    *******************************
293
    """
294
    x, y = pt
295
    front = x + orientation["front"][0], y + orientation["front"][1]
296
    left = x + orientation["left"][0], y + orientation["left"][1]
297
    right = x + orientation["right"][0], y + orientation["right"][1]
298
    back = x + orientation["back"][0], y + orientation["back"][1]
299
    if left in perimeter:
300
        new_orientation = turn_left(orientation)
301
        return left, new_orientation
302
    if front in perimeter:
303
        return front, orientation
304
    if right in perimeter:
305
        new_orientation = turn_right(orientation)
306
        return right, new_orientation
307
    if back in perimeter:
308
        new_orientation = turn_back(orientation)
309
        return back, new_orientation
310
    raise NextPointNotFound(
311
        "Point {} \nhas no next point in neighborhood {} \nthat is in perimeter {}".format(
312
            pt, {"left": left, "front": front, "right": right, "back": back}, perimeter
313
        )
314
    )
315
316
317
def compute_path(pts, d):
318
    """
319
    Compute the path around a segment.
320
321
    **********************************
322
    """
323
    path = []
324
    # first set remaining points to the whole cloud
325
    perimeter = set(pts)
326
    # find the bottom left point
327
    start_point = find_bottom_left(perimeter)
328
    path.append(start_point)
329
    # set the initial orientation
330
    start_orientation = {
331
        "front": (0, -d),
332
        "left": (-d, 0),
333
        "back": (0, d),
334
        "right": (d, 0)
335
    }
336
    current_point, orientation = go_to_next_point(
337
        start_point, start_orientation, perimeter
338
    )
339
    while current_point != start_point:
340
        path.append(current_point)
341
        # remaining.remove(current_point)
342
        next_point, next_orientation = go_to_next_point(
343
            current_point, orientation, perimeter
344
        )
345
        current_point = next_point
346
        orientation = next_orientation
347
    if len(path) > 0:
348
        return path
349
    else:
350
        raise NoPathFound(
351
            "No path found for {}, with interval {}!!!".format(pts, d)
352
        )
353
354
355
def layer_segment_to_json_struct(
356
    interval,
357
    layer_segments,
358
    layer_meta,
359
    slide
360
):
361
    """
362
    Create the json annotation file from the segments.
363
364
    **************************************************
365
    """
366
    slide_id = os.path.basename(slide._filename)
367
    # annotations = {"slide_id": slide_id, "layers": dict()}
368
    annotations = {"slide_id": slide_id, "layers": []}
369
    for layer, segments in layer_segments.items():
370
        meta = layer_meta[layer]
371
        layer_annotation = {
372
            "id": meta["label"],
373
            "color": meta["color"],
374
            "shapes": []
375
        }
376
        # annotations["layers"][layer] = dict()
377
        for idx, segment in enumerate(segments):
378
            # create one annotation by segment
379
            try:
380
                contour = get_contour_points(segment, adj=8)
381
                polygon = compute_path(contour, interval)
382
                polygon = [convert_coord(
383
                    pt, slide.dimensions
384
                ) for pt in polygon]
385
                shape = {
386
                    "points": [
387
                        {"x": x * 100, "y": y * 100,
388
                         "status": "written"} for x, y in polygon
389
                    ],
390
                    "id": str(idx),
391
                    "author": meta["author"],
392
                    "text": meta["text"],
393
                    "color": meta["color"],
394
                    "label": meta["label"],
395
                    "date": meta["date"]
396
                }
397
                layer_annotation["shapes"].append(shape)
398
            except (NoPathFound, NextPointNotFound, BottomLeftNotFound) as e:
399
                warnings.warn(str(e))
400
        annotations["layers"].append(layer_annotation)
401
    return annotations