--- a +++ b/pathaia/util/convert.py @@ -0,0 +1,401 @@ +""" +A Module to translate pathaia csv to json annotation for micromap. + +It uses networkx to handle graph structures. +""" +import json +import pandas as pd +import networkx as nx +import openslide +import os +from .types import PathLike +from typing import Dict, Iterable, Tuple +import warnings + + +class Error(Exception): + """ + Base of custom errors. + + ********************** + """ + + pass + + +class BottomLeftNotFound(Error): + """ + Raise when trying to access unknown level. + + ********************************************* + """ + + pass + + +class NextPointNotFound(Error): + """ + Raise when trying to access unknown level. + + ********************************************* + """ + + pass + + +class NoPathFound(Error): + """ + Raise when trying to access unknown level. + + ********************************************* + """ + + pass + + +class OutOfBound(Error): + """ + Raise when trying to access unknown level. + + ********************************************* + """ + + pass + + +colorCycle = [ + "#f44336", "#8bc34a", "#ffeb3b", "#673ab7", "#e91e63", "#cddc39", "#9c27b0", + "#ffc107", "#3f51b5", "#ff9800", "#2196f3", "#ff5722", "#03a9f4", "#795548", + "#00bcd4", "#607d8b", "#009688", "#4caf50" +] + + +def handle_predicted_patches( + patch_file: PathLike, + level: int, + column: str +): + """ + Read a patch file. + + Read lines of the patch csv looking for 'column' label. + Args: + patch_file (str): absolute path to a csv patch file. + level (int): pyramid level to query patches in the csv. + column (str): header of the column to use to label individual patches. + Yields: + tuple: position and label of patches (x, y, label). + + """ + df = pd.read_csv(patch_file) + level_df = df[df["level"] == level] + for _, row in level_df.iterrows(): + yield row["x"], row["y"], row["dx"], row[column], column + + +def get_category( + val: float, + thresholds: Dict[int, Tuple[float, float]] +): + """ + Return the break-apart categorical label from estimation. + + ********************************************************* + """ + for label, bounds in thresholds.items(): + low, high = bounds + if val >= low and val < high: + return label + raise OutOfBound( + "Value: {} is out of bounds for thresholds: {}!".format(val, thresholds) + ) + + +def gen_categorical_from_floatpreds( + patch_file: PathLike, + level: int, + column: str, + thresholds: Dict[int, Tuple[float, float]] +): + """ + Yield categorical patches from float predictions. + + ************************************************* + """ + for patch in handle_predicted_patches( + patch_file, level, column + ): + x, y, d, val, author = patch + try: + yield x, y, d, get_category(val, thresholds), author + except OutOfBound: + pass + + +def get_categorical_layer_edges( + categorical_patch_generator: Iterable, + color_dict: Dict[int, str], + classname_dict: Dict[int, str] +): + """ + Create graph features from patches for every layer of annotation. + + ******************************************************************* + """ + layer_nodes = dict() + layer_edges = dict() + layer_meta = dict() + interval = None + for patch in categorical_patch_generator: + x, y, d, cl, author = patch + if interval is None: + interval = d + cl_name = classname_dict[cl] + if cl_name not in layer_nodes: + layer_nodes[cl_name] = set() + layer_meta[cl_name] = { + "label": "{}_{}".format(author, cl_name), + "author": "{}".format(author), + "text": "", + "color": color_dict[cl], + "date": "" + } + # (x, y) is just the top left corner, to plot the polygon, + # we will need the four corners + layer_nodes[cl_name].add((x, y)) + layer_nodes[cl_name].add((x + interval, y)) + layer_nodes[cl_name].add((x, y + interval)) + layer_nodes[cl_name].add((x + interval, y + interval)) + for layer, nodes in layer_nodes.items(): + layer_edges[layer] = set() + for node in nodes: + x, y = node + for neighbor in [ + (x + interval, y), + (x - interval, y), + (x, y + interval), + (x, y - interval), + (x - interval, y - interval), + (x - interval, y + interval), + (x + interval, y + interval), + (x + interval, y - interval) + ]: + if neighbor in nodes: + layer_edges[layer].add((node, neighbor)) + return layer_edges, layer_meta, interval + + +def get_categorical_segments_from_edges(layer_edges: Dict): + """ + Create segments from layer edges. + + ********************************* + """ + layer_segments = dict() + for layer, edges in layer_edges.items(): + layer_segments[layer] = [] + # create a graph + layer_graph = nx.Graph() + layer_graph.add_edges_from(edges) + for c in nx.connected_components(layer_graph): + layer_segments[layer].append(layer_graph.subgraph(c).copy()) + return layer_segments + + +def get_contour_points(segment, adj=8): + """ + Find contour points of a segment. + + ********************************* + """ + contour = [] + for pt in segment.nodes: + if segment.degree[pt] < adj: + contour.append(pt) + return contour + + +def convert_coord(coord, slide_dims): + """ + Compute relative coords from abs. + + ********************************* + """ + sx, sy = slide_dims + x, y = coord + return float(x) / sx, float(y) / sy + + +def find_bottom_left(pts): + """ + Find bottom left point. + + *********************** + """ + ymax = max([pt[1] for pt in pts]) + xmin = 100000000000 + bl = None + for pt in pts: + x, y = pt + if y == ymax: + if x <= xmin: + bl = x, y + if bl is not None: + return bl + raise BottomLeftNotFound("Did not find bottom left point of the cloud !!!") + + +def turn_left(orientation): + """ + Compute a new orientation after turning on the left. + + ************************************************** + """ + new_orientation = dict() + new_orientation["front"] = orientation["left"] + new_orientation["left"] = orientation["back"] + new_orientation["back"] = orientation["right"] + new_orientation["right"] = orientation["front"] + return new_orientation + + +def turn_right(orientation): + """ + Compute a new orientation after turning on the right. + + ***************************************************** + """ + new_orientation = dict() + new_orientation["front"] = orientation["right"] + new_orientation["left"] = orientation["front"] + new_orientation["back"] = orientation["left"] + new_orientation["right"] = orientation["back"] + return new_orientation + + +def turn_back(orientation): + """ + Compute a new orientation after turning back. + + ********************************************* + """ + # basically, it's just 'turn_right' twice... + new_orientation = turn_right(orientation) + new_new_orientation = turn_right(new_orientation) + return new_new_orientation + + +def go_to_next_point(pt, orientation, perimeter): + """ + Compute next point in the path. + + ******************************* + """ + x, y = pt + front = x + orientation["front"][0], y + orientation["front"][1] + left = x + orientation["left"][0], y + orientation["left"][1] + right = x + orientation["right"][0], y + orientation["right"][1] + back = x + orientation["back"][0], y + orientation["back"][1] + if left in perimeter: + new_orientation = turn_left(orientation) + return left, new_orientation + if front in perimeter: + return front, orientation + if right in perimeter: + new_orientation = turn_right(orientation) + return right, new_orientation + if back in perimeter: + new_orientation = turn_back(orientation) + return back, new_orientation + raise NextPointNotFound( + "Point {} \nhas no next point in neighborhood {} \nthat is in perimeter {}".format( + pt, {"left": left, "front": front, "right": right, "back": back}, perimeter + ) + ) + + +def compute_path(pts, d): + """ + Compute the path around a segment. + + ********************************** + """ + path = [] + # first set remaining points to the whole cloud + perimeter = set(pts) + # find the bottom left point + start_point = find_bottom_left(perimeter) + path.append(start_point) + # set the initial orientation + start_orientation = { + "front": (0, -d), + "left": (-d, 0), + "back": (0, d), + "right": (d, 0) + } + current_point, orientation = go_to_next_point( + start_point, start_orientation, perimeter + ) + while current_point != start_point: + path.append(current_point) + # remaining.remove(current_point) + next_point, next_orientation = go_to_next_point( + current_point, orientation, perimeter + ) + current_point = next_point + orientation = next_orientation + if len(path) > 0: + return path + else: + raise NoPathFound( + "No path found for {}, with interval {}!!!".format(pts, d) + ) + + +def layer_segment_to_json_struct( + interval, + layer_segments, + layer_meta, + slide +): + """ + Create the json annotation file from the segments. + + ************************************************** + """ + slide_id = os.path.basename(slide._filename) + # annotations = {"slide_id": slide_id, "layers": dict()} + annotations = {"slide_id": slide_id, "layers": []} + for layer, segments in layer_segments.items(): + meta = layer_meta[layer] + layer_annotation = { + "id": meta["label"], + "color": meta["color"], + "shapes": [] + } + # annotations["layers"][layer] = dict() + for idx, segment in enumerate(segments): + # create one annotation by segment + try: + contour = get_contour_points(segment, adj=8) + polygon = compute_path(contour, interval) + polygon = [convert_coord( + pt, slide.dimensions + ) for pt in polygon] + shape = { + "points": [ + {"x": x * 100, "y": y * 100, + "status": "written"} for x, y in polygon + ], + "id": str(idx), + "author": meta["author"], + "text": meta["text"], + "color": meta["color"], + "label": meta["label"], + "date": meta["date"] + } + layer_annotation["shapes"].append(shape) + except (NoPathFound, NextPointNotFound, BottomLeftNotFound) as e: + warnings.warn(str(e)) + annotations["layers"].append(layer_annotation) + return annotations