a b/brats_toolkit/segmentor.py
1
# -*- coding: utf-8 -*-
2
# Author: Christoph Berger
3
# Script for evaluation and bulk segmentation of Brain Tumor Scans
4
# using the MICCAI BRATS algorithmic repository
5
#
6
# Please refer to README.md and LICENSE.md for further documentation
7
# This software is not certified for clinical use.
8
9
__version__ = "0.1"
10
__author__ = "Christoph Berger"
11
12
import errno
13
import glob
14
import json
15
import logging
16
import os
17
import os.path as op
18
import subprocess
19
import sys
20
import tempfile
21
22
import numpy as np
23
24
from . import fusionator
25
from .util import filemanager as fm
26
from .util import own_itk as oitk
27
from .util.citation_reminder import citation_reminder, new_segmentor_note
28
29
30
class Segmentor(object):
31
    """
32
    Now does it all!
33
    """
34
35
    @new_segmentor_note
36
    @citation_reminder
37
    def __init__(
38
        self,
39
        config=None,
40
        fileformats=None,
41
        verbose=True,
42
        tty=False,
43
        newdocker=True,
44
        gpu="0",
45
    ):
46
        """Init the orchestra class with placeholders"""
47
        self.noOfContainers = 0
48
        self.config = []
49
        self.directory = None
50
        self.verbose = verbose
51
        self.tty = tty
52
        self.dockerGPU = newdocker
53
        self.gpu = gpu
54
        self.package_directory = op.dirname(op.abspath(__file__))
55
        # set environment variables to limit GPU usage
56
        os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"  # see issue #152
57
        os.environ["CUDA_VISIBLE_DEVICES"] = gpu
58
        if config is None:
59
            config = op.join(self.package_directory, "config", "dockers.json")
60
        if fileformats is None:
61
            self.fileformats = op.join(
62
                self.package_directory, "config", "fileformats.json"
63
            )
64
        else:
65
            self.fileformats = fileformats
66
        try:
67
            configfile = open(config, "r")
68
            self.config = json.load(configfile)
69
            self.noOfContainers = len(self.config.keys())
70
            configfile.close()
71
        except IOError as e:
72
            logging.exception("I/O error({0}): {1}".format(e.errno, e.strerror))
73
            raise
74
        except ValueError:
75
            logging.exception("Invalid configuration file")
76
            raise
77
        except:
78
            logging.exception("Unexpected Error!")
79
            raise
80
81
    def _getFileFormat(self, index):
82
        return self.config[index]["fileformat"]
83
84
    def _getContainerName(self, index):
85
        return self.config[index]["name"]
86
87
    def _getNumberOfContainers(self):
88
        return len(self.config)
89
90
    def _runDummyContainer(self, stop=False):
91
        command = "docker run --rm -it hello-world"
92
        subprocess.check_call(command, shell=True)
93
94
    def _runContainer(self, id, directory, outputDir, outputName):
95
        """
96
        Runs one container on one patient folder
97
        """
98
        logging.info("Now running a segmentation with the Docker {}.".format(id))
99
        logging.info("Output will be in {}.".format(outputDir))
100
101
        params = self.config[id]  # only references, doesn't copy
102
        command = "docker run --rm"
103
        # assemble the rest of the command
104
        flags = params.get("flags", "")
105
        # check if we need to map the user
106
        if params.get("user_mode", False):
107
            user_flags = "--user $(id -u):$(id -g)"
108
        else:
109
            user_flags = ""
110
        # assemble the gpu flags if needed
111
        if params["runtime"] == "nvidia":
112
            if self.dockerGPU:
113
                # TODO clean this up
114
                gpu_flags = "--gpus device=" + str(self.gpu)
115
            else:
116
                gpu_flags = "--runtime=nvidia -e CUDA_VISIBLE_DEVICES=" + str(self.gpu)
117
        else:
118
            gpu_flags = ""
119
        # assemble directory mapping
120
        volume = "-v " + str(directory) + ":" + str(params["mountpoint"])
121
        # assemble execution command
122
        call = str(params["command"])
123
124
        # stick everything together
125
        command = (
126
            command
127
            + " "
128
            + user_flags
129
            + " "
130
            + gpu_flags
131
            + " "
132
            + flags
133
            + " "
134
            + volume
135
            + " "
136
            + params["id"]
137
            + " "
138
            + call
139
        )
140
141
        if self.verbose:
142
            print("Executing: {}".format(command))
143
        try:
144
            with open(
145
                op.join(outputDir, "{}_output.log".format(outputName.split(".")[0])),
146
                "w",
147
            ) as f:
148
                subprocess.check_call(command, shell=True, stdout=f)
149
        except Exception as e:
150
            logging.error(
151
                "Segmentation failed for case {} with error: {}".format(directory, e)
152
            )
153
            if "exit status 125" in str(e):
154
                logging.error(
155
                    "DOCKER DAEMON not running! Please start your Docker runtime."
156
                )
157
                sys.exit(125)
158
            return False
159
        if self.verbose:
160
            logging.info("Container exited without error")
161
        # fileh.close()
162
        return True
163
164
    def _runIterate(self, dir, cid):
165
        """Iterates over a directory and runs the segmentation on each patient found"""
166
        logging.info("Looking for BRATS data directories..")
167
        for fn in os.listdir(dir):
168
            if not os.path.isdir(os.path.join(dir, fn)):
169
                continue  # Not a directory
170
            if "DE_RI" in fn:
171
                logging.info("Found pat data: {}".format(fn))
172
                try:
173
                    os.makedirs(os.path.join(os.path.join(dir, fn), "results"))
174
                except OSError as err:
175
                    if err.errno != errno.EEXIST:
176
                        raise
177
                logging.info("Calling Container: {}".format(cid))
178
                if not self._runContainer(cid, os.path.join(dir, fn), dir):
179
                    logging.info(
180
                        "ERROR: Run failed for patient {} with container {}".format(
181
                            fn, cid
182
                        )
183
                    )
184
                    return False
185
                # TODO: rename folder and prepend pat_id
186
                # rename_folder(img_id, os.path.join(directory, fn), fn)
187
        return True
188
189
    def _multiSegment(self, tempDir, inputs, method, outputName, outputDir):
190
        """
191
        multiSegment [summary]
192
193
        Args:
194
            tempDir ([type]): [description]
195
            inputs ([type]): [description]
196
            method ([type]): [description]
197
            outputName ([type]): [description]
198
            outputDir ([type]): [description]
199
        """
200
        logging.debug("CALLED MULTISEGMENT")
201
        fusion = fusionator.Fusionator()
202
        for cid in self.config.keys():
203
            # replace this with a call to single-segment
204
            logging.info("[Orchestra] Segmenting with " + cid)
205
            ff = self._format(self._getFileFormat(cid), self.fileformats)
206
            for key, img in inputs.items():
207
                savepath = op.join(tempDir, ff[key])
208
                img = oitk.get_itk_image(img)
209
                if self.verbose:
210
                    logging.info(
211
                        "[Weborchestra][Info] Writing to path {}".format(savepath)
212
                    )
213
                oitk.write_itk_image(img, savepath)
214
            if self.verbose:
215
                logging.info("[Weborchestra][Info] Images saved correctly")
216
                logging.info(
217
                    "[Weborchestra][Info] Starting the Segmentation with container {} now".format(
218
                        cid
219
                    )
220
                )
221
222
            status = self._runContainer(cid, tempDir, outputDir)
223
            status = self._runContainer(cid, tempDir, outputDir, outputName)
224
            if status:
225
                if self.verbose:
226
                    logging.info("[Weborchestra][Success] Segmentation saved")
227
                resultsDir = op.join(tempDir, "results/")
228
                saveLocation = op.join(outputDir, cid + "_tumor_seg.nii.gz")
229
                self._handleResult(cid, resultsDir, outputPath=saveLocation)
230
            else:
231
                logging.exception("Container run for CID {} failed!".format(cid))
232
        fusion._dirFuse(
233
            outputDir, method=method, outputPath=op.join(outputDir, outputName)
234
        )
235
236
    def _singleSegment(self, tempDir, inputs, cid, outputName, outputDir):
237
        """
238
        singleSegment [summary]
239
240
        Args:
241
            tempDir ([type]): [description]
242
            inputs ([type]): [description]
243
            cid ([type]): [description]
244
            outputName ([type]): [description]
245
            outputDir ([type]): [description]
246
        """
247
        ff = self._format(self._getFileFormat(cid), self.fileformats)
248
        for key, img in inputs.items():
249
            savepath = op.join(tempDir, ff[key])
250
            img = oitk.get_itk_image(img)
251
            if self.verbose:
252
                logging.info("[Weborchestra][Info] Writing to path {}".format(savepath))
253
            oitk.write_itk_image(img, savepath)
254
        if self.verbose:
255
            logging.info("[Weborchestra][Info] Images saved correctly")
256
            logging.info(
257
                "[Weborchestra][Info] Starting the Segmentation with {} now".format(cid)
258
            )
259
        status = self._runContainer(cid, tempDir, outputDir, outputName)
260
        if status:
261
            if self.verbose:
262
                logging.info("[Weborchestra][Success] Segmentation saved")
263
            resultsDir = op.join(tempDir, "results/")
264
            self._handleResult(
265
                cid, resultsDir, outputPath=op.join(outputDir, outputName)
266
            )
267
            # delete tmp directory if result was saved elsewhere
268
        else:
269
            logging.error("[Weborchestra][Error] Segmentation failed, see output!")
270
271
    def segment(
272
        self, t1=None, t1c=None, t2=None, fla=None, cid="mocker", outputPath=None
273
    ):
274
        """
275
        segment [summary]
276
277
        Args:
278
            t1 ([type], optional): [description]. Defaults to None.
279
            t1c ([type], optional): [description]. Defaults to None.
280
            t2 ([type], optional): [description]. Defaults to None.
281
            fla ([type], optional): [description]. Defaults to None.
282
            cid (str, optional): [description]. Defaults to 'mocker'.
283
            outputPath ([type], optional): [description]. Defaults to None.
284
        """
285
        # Call output method here
286
        outputName, outputDir = self._whereDoesTheFileGo(outputPath, t1, cid)
287
        # set up logging (for all internal functions)
288
        logging.basicConfig(
289
            format="%(asctime)s %(levelname)s:%(message)s",
290
            filename=op.join(outputDir, "segmentor_high_level.log"),
291
            level=logging.DEBUG,
292
        )
293
        logging.getLogger().addHandler(logging.StreamHandler())
294
        logging.debug("DIRNAME is: " + outputDir)
295
        logging.debug("FILENAME is: " + outputName)
296
        logging.info(
297
            "Now running a new set of segmentations on input: {}".format(op.dirname(t1))
298
        )
299
        # switch between
300
        inputs = {"t1": t1, "t2": t2, "t1c": t1c, "fla": fla}
301
        # create temporary directory for storage
302
        storage = tempfile.TemporaryDirectory(dir=self.package_directory)
303
        # TODO this is a potential security hazzard as all users can access the files now, but currently it seems the only way to deal with bad configured docker installations
304
        os.chmod(storage.name, 0o777)
305
        tempDir = op.abspath(storage.name)
306
        resultsDir = op.join(tempDir, "results")
307
        os.mkdir(resultsDir)
308
        # TODO this is a potential security hazzard as all users can access the files now, but currently it seems the only way to deal with bad configured docker installations
309
        os.chmod(resultsDir, 0o777)
310
        logging.debug(tempDir)
311
        logging.debug(resultsDir)
312
313
        if cid == "mav" or cid == "simple" or cid == "all":
314
            # segment with all containers
315
            logging.info("Called singleSegment with method: " + cid)
316
            self._multiSegment(tempDir, inputs, cid, outputName, outputDir)
317
        else:
318
            # segment only with a single container
319
            logging.info("Called singleSegment with docker: " + cid)
320
            self._singleSegment(tempDir, inputs, cid, outputName, outputDir)
321
322
    ### Private utility methods below ###
323
324
    def _whereDoesTheFileGo(self, outputPath, t1path, cid):
325
        if outputPath is None:
326
            outputDir = op.join(op.dirname(t1path), "output")
327
            outputName = cid + "_segmentation.nii.gz"
328
        elif outputPath.endswith(".nii.gz"):
329
            if "~" in outputPath:
330
                outputPath = op.expanduser(outputPath)
331
            # valid filename
332
            outputDir = op.dirname(outputPath)
333
            outputName = op.basename(outputPath)
334
            # if only a filename is passed, use the t1 directory
335
            if outputDir == "":
336
                outputDir = op.join(op.dirname(t1path), "output")
337
        else:
338
            outputDir = outputName = None
339
340
        if outputDir is None or outputName is None:
341
            raise ValueError(
342
                "The outputPath is ambiguous and cannot be determined! path: {}, t1path: {}, cid: {}".format(
343
                    outputPath, t1path, cid
344
                )
345
            )
346
        # build abspaths:
347
        outputDir = op.abspath(outputDir)
348
        try:
349
            os.makedirs(outputDir, exist_ok=True)
350
        except Exception as e:
351
            print("could not create target directory: {}".format(outputDir))
352
            raise e
353
        return outputName, outputDir
354
355
    def _handleResult(self, cid, directory, outputPath):
356
        """
357
        This function handles the copying and renaming of the
358
        Segmentation result before returning
359
        """
360
        # Todo: Find segmentation result
361
        contents = glob.glob(op.join(directory, "tumor_" + cid + "_class.nii*"))
362
        if len(contents) == 0:
363
            contents = glob.glob(op.join(directory, "tumor_*_class.nii*"))
364
        if len(contents) == 0:
365
            contents = glob.glob(op.join(directory, cid + "*.nii*"))
366
        if len(contents) == 0:
367
            contents = glob.glob(op.join(directory, "*tumor*.nii*"))
368
        if len(contents) < 1:
369
            logging.error(
370
                "[Weborchestra - Filehandling][Error] No segmentation saved, the container run has most likely failed."
371
            )
372
        elif len(contents) > 1:
373
            logging.warning(
374
                "[Weborchestra - Filehandling][Warning] Multiple Segmentations found"
375
            )
376
            print("found files: {}".format(contents))
377
            img = oitk.get_itk_image(contents[0])
378
            labels = 0
379
            exportImg = None
380
            for _, c in enumerate(contents):
381
                img = oitk.get_itk_image(c)
382
                if labels < len(np.unique(oitk.get_itk_array(img))):
383
                    exportImg = img
384
                    labels = len(np.unique(oitk.get_itk_array(img)))
385
            oitk.write_itk_image(exportImg, op.join(outputPath))
386
            logging.warning(
387
                "[Weborchestra - Filehandling][Warning] Segmentation with most labels ({}) for cid {} saved".format(
388
                    labels, cid
389
                )
390
            )
391
            return
392
        img = oitk.get_itk_image(contents[0])
393
        for c in contents:
394
            os.remove(op.join(directory, c))
395
        oitk.write_itk_image(img, outputPath)
396
397
    def _format(self, fileformat, configpath, verbose=True):
398
        # load fileformat for a given container
399
        try:
400
            configfile = open(op.abspath(configpath), "r")
401
            config = json.load(configfile)
402
            configfile.close()
403
        except IOError as e:
404
            logging.exception("I/O error({0}): {1}".format(e.errno, e.strerror))
405
            raise
406
        except ValueError:
407
            logging.exception("Invalid configuration file")
408
            raise
409
        except:
410
            logging.exception("Unexpected Error!")
411
            raise
412
        logging.info(
413
            "[Weborchestra][Success]Loaded fileformat: {}".format(config[fileformat])
414
        )
415
        return config[fileformat]