Diff of /pathaia/util/convert.py [000000] .. [7823dd]

Switch to side-by-side view

--- a
+++ b/pathaia/util/convert.py
@@ -0,0 +1,401 @@
+"""
+A Module to translate pathaia csv to json annotation for micromap.
+
+It uses networkx to handle graph structures.
+"""
+import json
+import pandas as pd
+import networkx as nx
+import openslide
+import os
+from .types import PathLike
+from typing import Dict, Iterable, Tuple
+import warnings
+
+
+class Error(Exception):
+    """
+    Base of custom errors.
+
+    **********************
+    """
+
+    pass
+
+
+class BottomLeftNotFound(Error):
+    """
+    Raise when trying to access unknown level.
+
+    *********************************************
+    """
+
+    pass
+
+
+class NextPointNotFound(Error):
+    """
+    Raise when trying to access unknown level.
+
+    *********************************************
+    """
+
+    pass
+
+
+class NoPathFound(Error):
+    """
+    Raise when trying to access unknown level.
+
+    *********************************************
+    """
+
+    pass
+
+
+class OutOfBound(Error):
+    """
+    Raise when trying to access unknown level.
+
+    *********************************************
+    """
+
+    pass
+
+
+colorCycle = [
+    "#f44336", "#8bc34a", "#ffeb3b", "#673ab7", "#e91e63", "#cddc39", "#9c27b0",
+    "#ffc107", "#3f51b5", "#ff9800", "#2196f3", "#ff5722", "#03a9f4", "#795548",
+    "#00bcd4", "#607d8b", "#009688", "#4caf50"
+]
+
+
+def handle_predicted_patches(
+    patch_file: PathLike,
+    level: int,
+    column: str
+):
+    """
+    Read a patch file.
+
+    Read lines of the patch csv looking for 'column' label.
+    Args:
+        patch_file (str): absolute path to a csv patch file.
+        level (int): pyramid level to query patches in the csv.
+        column (str): header of the column to use to label individual patches.
+    Yields:
+        tuple: position and label of patches (x, y, label).
+
+    """
+    df = pd.read_csv(patch_file)
+    level_df = df[df["level"] == level]
+    for _, row in level_df.iterrows():
+        yield row["x"], row["y"], row["dx"], row[column], column
+
+
+def get_category(
+    val: float,
+    thresholds: Dict[int, Tuple[float, float]]
+):
+    """
+    Return the break-apart categorical label from estimation.
+
+    *********************************************************
+    """
+    for label, bounds in thresholds.items():
+        low, high = bounds
+        if val >= low and val < high:
+            return label
+    raise OutOfBound(
+        "Value: {} is out of bounds for thresholds: {}!".format(val, thresholds)
+    )
+
+
+def gen_categorical_from_floatpreds(
+    patch_file: PathLike,
+    level: int,
+    column: str,
+    thresholds: Dict[int, Tuple[float, float]]
+):
+    """
+    Yield categorical patches from float predictions.
+
+    *************************************************
+    """
+    for patch in handle_predicted_patches(
+        patch_file, level, column
+    ):
+        x, y, d, val, author = patch
+        try:
+            yield x, y, d, get_category(val, thresholds), author
+        except OutOfBound:
+            pass
+
+
+def get_categorical_layer_edges(
+    categorical_patch_generator: Iterable,
+    color_dict: Dict[int, str],
+    classname_dict: Dict[int, str]
+):
+    """
+    Create graph features from patches for every layer of annotation.
+
+    *******************************************************************
+    """
+    layer_nodes = dict()
+    layer_edges = dict()
+    layer_meta = dict()
+    interval = None
+    for patch in categorical_patch_generator:
+        x, y, d, cl, author = patch
+        if interval is None:
+            interval = d
+        cl_name = classname_dict[cl]
+        if cl_name not in layer_nodes:
+            layer_nodes[cl_name] = set()
+            layer_meta[cl_name] = {
+                "label": "{}_{}".format(author, cl_name),
+                "author": "{}".format(author),
+                "text": "",
+                "color": color_dict[cl],
+                "date": ""
+            }
+        # (x, y) is just the top left corner, to plot the polygon,
+        # we will need the four corners
+        layer_nodes[cl_name].add((x, y))
+        layer_nodes[cl_name].add((x + interval, y))
+        layer_nodes[cl_name].add((x, y + interval))
+        layer_nodes[cl_name].add((x + interval, y + interval))
+    for layer, nodes in layer_nodes.items():
+        layer_edges[layer] = set()
+        for node in nodes:
+            x, y = node
+            for neighbor in [
+                (x + interval, y),
+                (x - interval, y),
+                (x, y + interval),
+                (x, y - interval),
+                (x - interval, y - interval),
+                (x - interval, y + interval),
+                (x + interval, y + interval),
+                (x + interval, y - interval)
+            ]:
+                if neighbor in nodes:
+                    layer_edges[layer].add((node, neighbor))
+    return layer_edges, layer_meta, interval
+
+
+def get_categorical_segments_from_edges(layer_edges: Dict):
+    """
+    Create segments from layer edges.
+
+    *********************************
+    """
+    layer_segments = dict()
+    for layer, edges in layer_edges.items():
+        layer_segments[layer] = []
+        # create a graph
+        layer_graph = nx.Graph()
+        layer_graph.add_edges_from(edges)
+        for c in nx.connected_components(layer_graph):
+            layer_segments[layer].append(layer_graph.subgraph(c).copy())
+    return layer_segments
+
+
+def get_contour_points(segment, adj=8):
+    """
+    Find contour points of a segment.
+
+    *********************************
+    """
+    contour = []
+    for pt in segment.nodes:
+        if segment.degree[pt] < adj:
+            contour.append(pt)
+    return contour
+
+
+def convert_coord(coord, slide_dims):
+    """
+    Compute relative coords from abs.
+
+    *********************************
+    """
+    sx, sy = slide_dims
+    x, y = coord
+    return float(x) / sx, float(y) / sy
+
+
+def find_bottom_left(pts):
+    """
+    Find bottom left point.
+
+    ***********************
+    """
+    ymax = max([pt[1] for pt in pts])
+    xmin = 100000000000
+    bl = None
+    for pt in pts:
+        x, y = pt
+        if y == ymax:
+            if x <= xmin:
+                bl = x, y
+    if bl is not None:
+        return bl
+    raise BottomLeftNotFound("Did not find bottom left point of the cloud !!!")
+
+
+def turn_left(orientation):
+    """
+    Compute a new orientation after turning on the left.
+
+    **************************************************
+    """
+    new_orientation = dict()
+    new_orientation["front"] = orientation["left"]
+    new_orientation["left"] = orientation["back"]
+    new_orientation["back"] = orientation["right"]
+    new_orientation["right"] = orientation["front"]
+    return new_orientation
+
+
+def turn_right(orientation):
+    """
+    Compute a new orientation after turning on the right.
+
+    *****************************************************
+    """
+    new_orientation = dict()
+    new_orientation["front"] = orientation["right"]
+    new_orientation["left"] = orientation["front"]
+    new_orientation["back"] = orientation["left"]
+    new_orientation["right"] = orientation["back"]
+    return new_orientation
+
+
+def turn_back(orientation):
+    """
+    Compute a new orientation after turning back.
+
+    *********************************************
+    """
+    # basically, it's just 'turn_right' twice...
+    new_orientation = turn_right(orientation)
+    new_new_orientation = turn_right(new_orientation)
+    return new_new_orientation
+
+
+def go_to_next_point(pt, orientation, perimeter):
+    """
+    Compute next point in the path.
+
+    *******************************
+    """
+    x, y = pt
+    front = x + orientation["front"][0], y + orientation["front"][1]
+    left = x + orientation["left"][0], y + orientation["left"][1]
+    right = x + orientation["right"][0], y + orientation["right"][1]
+    back = x + orientation["back"][0], y + orientation["back"][1]
+    if left in perimeter:
+        new_orientation = turn_left(orientation)
+        return left, new_orientation
+    if front in perimeter:
+        return front, orientation
+    if right in perimeter:
+        new_orientation = turn_right(orientation)
+        return right, new_orientation
+    if back in perimeter:
+        new_orientation = turn_back(orientation)
+        return back, new_orientation
+    raise NextPointNotFound(
+        "Point {} \nhas no next point in neighborhood {} \nthat is in perimeter {}".format(
+            pt, {"left": left, "front": front, "right": right, "back": back}, perimeter
+        )
+    )
+
+
+def compute_path(pts, d):
+    """
+    Compute the path around a segment.
+
+    **********************************
+    """
+    path = []
+    # first set remaining points to the whole cloud
+    perimeter = set(pts)
+    # find the bottom left point
+    start_point = find_bottom_left(perimeter)
+    path.append(start_point)
+    # set the initial orientation
+    start_orientation = {
+        "front": (0, -d),
+        "left": (-d, 0),
+        "back": (0, d),
+        "right": (d, 0)
+    }
+    current_point, orientation = go_to_next_point(
+        start_point, start_orientation, perimeter
+    )
+    while current_point != start_point:
+        path.append(current_point)
+        # remaining.remove(current_point)
+        next_point, next_orientation = go_to_next_point(
+            current_point, orientation, perimeter
+        )
+        current_point = next_point
+        orientation = next_orientation
+    if len(path) > 0:
+        return path
+    else:
+        raise NoPathFound(
+            "No path found for {}, with interval {}!!!".format(pts, d)
+        )
+
+
+def layer_segment_to_json_struct(
+    interval,
+    layer_segments,
+    layer_meta,
+    slide
+):
+    """
+    Create the json annotation file from the segments.
+
+    **************************************************
+    """
+    slide_id = os.path.basename(slide._filename)
+    # annotations = {"slide_id": slide_id, "layers": dict()}
+    annotations = {"slide_id": slide_id, "layers": []}
+    for layer, segments in layer_segments.items():
+        meta = layer_meta[layer]
+        layer_annotation = {
+            "id": meta["label"],
+            "color": meta["color"],
+            "shapes": []
+        }
+        # annotations["layers"][layer] = dict()
+        for idx, segment in enumerate(segments):
+            # create one annotation by segment
+            try:
+                contour = get_contour_points(segment, adj=8)
+                polygon = compute_path(contour, interval)
+                polygon = [convert_coord(
+                    pt, slide.dimensions
+                ) for pt in polygon]
+                shape = {
+                    "points": [
+                        {"x": x * 100, "y": y * 100,
+                         "status": "written"} for x, y in polygon
+                    ],
+                    "id": str(idx),
+                    "author": meta["author"],
+                    "text": meta["text"],
+                    "color": meta["color"],
+                    "label": meta["label"],
+                    "date": meta["date"]
+                }
+                layer_annotation["shapes"].append(shape)
+            except (NoPathFound, NextPointNotFound, BottomLeftNotFound) as e:
+                warnings.warn(str(e))
+        annotations["layers"].append(layer_annotation)
+    return annotations