# Copyright (c) OpenMMLab. All rights reserved.
import os
import os.path as osp
import warnings
from collections import OrderedDict
from functools import reduce
from PIL import Image
import cv2
import mmcv
import numpy as np
from mmcv.utils import print_log
from prettytable import PrettyTable
import torch
from torch.utils.data import Dataset
from mmseg.core import eval_metrics, intersect_and_union, pre_eval_to_metrics
from mmseg.utils import get_root_logger
from .builder import DATASETS
from .pipelines import Compose, LoadAnnotations
@DATASETS.register_module()
class CustomDataset(Dataset):
"""Custom dataset for semantic segmentation. An example of file structure
is as followed.
.. code-block:: none
├── data
│ ├── my_dataset
│ │ ├── img_dir
│ │ │ ├── train
│ │ │ │ ├── xxx{img_suffix}
│ │ │ │ ├── yyy{img_suffix}
│ │ │ │ ├── zzz{img_suffix}
│ │ │ ├── val
│ │ ├── ann_dir
│ │ │ ├── train
│ │ │ │ ├── xxx{seg_map_suffix}
│ │ │ │ ├── yyy{seg_map_suffix}
│ │ │ │ ├── zzz{seg_map_suffix}
│ │ │ ├── val
The img/gt_semantic_seg pair of CustomDataset should be of the same
except suffix. A valid img/gt_semantic_seg filename pair should be like
``xxx{img_suffix}`` and ``xxx{seg_map_suffix}`` (extension is also included
in the suffix). If split is given, then ``xxx`` is specified in txt file.
Otherwise, all files in ``img_dir/``and ``ann_dir`` will be loaded.
Please refer to ``docs/tutorials/new_dataset.md`` for more details.
Args:
pipeline (list[dict]): Processing pipeline
img_dir (str): Path to image directory
img_suffix (str): Suffix of images. Default: '.jpg'
ann_dir (str, optional): Path to annotation directory. Default: None
seg_map_suffix (str): Suffix of segmentation maps. Default: '.png'
split (str, optional): Split txt file. If split is specified, only
file with suffix in the splits will be loaded. Otherwise, all
images in img_dir/ann_dir will be loaded. Default: None
data_root (str, optional): Data root for img_dir/ann_dir. Default:
None.
test_mode (bool): If test_mode=True, gt wouldn't be loaded.
ignore_index (int): The label index to be ignored. Default: 255
reduce_zero_label (bool): Whether to mark label zero as ignored.
Default: False
classes (str | Sequence[str], optional): Specify classes to load.
If is None, ``cls.CLASSES`` will be used. Default: None.
palette (Sequence[Sequence[int]]] | np.ndarray | None):
The palette of segmentation map. If None is given, and
self.PALETTE is None, random palette will be generated.
Default: None
gt_seg_map_loader_cfg (dict, optional): build LoadAnnotations to
load gt for evaluation, load from disk by default. Default: None.
"""
CLASSES = None
PALETTE = None
def __init__(self,
pipeline,
img_dir,
img_suffix='.jpg',
ann_dir=None,
seg_map_suffix='.png',
split=None,
data_root=None,
test_mode=False,
ignore_index=255,
reduce_zero_label=False,
keep_empty_prob=1.,
classes=None,
palette=None,
gt_seg_map_loader_cfg=None,
multi_label=False):
self.multi_label = multi_label
self.use_mosaic = any([_['type'] == "Mosaic" for _ in pipeline])
if self.use_mosaic:
assert sum([_['type'] == "Mosaic" for _ in pipeline]) == 1
mosaic_at = [_['type'] == "Mosaic" for _ in pipeline].index(True)
mosaic = pipeline[mosaic_at]
self.mosaic_prob = mosaic.get("p", 0.5)
self.mosaic_center = mosaic.get("center", (0.25, 0.75))
self.load_pipeline = Compose(pipeline[:mosaic_at])
self.pipeline = Compose(pipeline[mosaic_at + 1:])
else:
self.pipeline = Compose(pipeline)
self.img_dir = img_dir
self.img_suffix = img_suffix
self.ann_dir = ann_dir
self.seg_map_suffix = seg_map_suffix
self.split = split
if self.split == "None":
self.split = None
self.data_root = data_root
self.test_mode = test_mode
self.ignore_index = ignore_index
self.reduce_zero_label = reduce_zero_label
self.label_map = None
self.CLASSES, self.PALETTE = self.get_classes_and_palette(
classes, palette)
self.gt_seg_map_loader = LoadAnnotations(
) if gt_seg_map_loader_cfg is None else LoadAnnotations(
**gt_seg_map_loader_cfg)
if test_mode:
assert self.CLASSES is not None, \
'`cls.CLASSES` or `classes` should be specified when testing'
# join paths if data_root is specified
if self.data_root is not None:
if not osp.isabs(self.img_dir):
self.img_dir = osp.join(self.data_root, self.img_dir)
if not (self.ann_dir is None or osp.isabs(self.ann_dir)):
self.ann_dir = osp.join(self.data_root, self.ann_dir)
if not (self.split is None or osp.isabs(self.split)):
self.split = osp.join(self.data_root, self.split)
# load annotations
self.img_infos = self.load_annotations(self.img_dir, self.img_suffix,
self.ann_dir,
self.seg_map_suffix, self.split)
self.keep_empty_prob = keep_empty_prob
self.get_resample_weight()
def __len__(self):
"""Total number of samples of data."""
return len(self.img_infos)
def load_annotations(self, img_dir, img_suffix, ann_dir, seg_map_suffix,
split):
"""Load annotation from directory.
Args:
img_dir (str): Path to image directory
img_suffix (str): Suffix of images.
ann_dir (str|None): Path to annotation directory.
seg_map_suffix (str|None): Suffix of segmentation maps.
split (str|None): Split txt file. If split is specified, only file
with suffix in the splits will be loaded. Otherwise, all images
in img_dir/ann_dir will be loaded. Default: None
Returns:
list[dict]: All image info of dataset.
"""
img_infos = []
if split is not None:
with open(split) as f:
for line in f:
img_name = line.strip()
img_info = dict(filename=img_name + img_suffix)
if ann_dir is not None:
seg_map = img_name + seg_map_suffix
img_info['ann'] = dict(seg_map=seg_map)
img_infos.append(img_info)
else:
for img in mmcv.scandir(img_dir, img_suffix, recursive=True):
img_info = dict(filename=img)
if ann_dir is not None:
seg_map = img.replace(img_suffix, seg_map_suffix)
img_info['ann'] = dict(seg_map=seg_map)
img_infos.append(img_info)
img_infos = sorted(img_infos, key=lambda x: x['filename'])
print_log(f'Loaded {len(img_infos)} images', logger=get_root_logger())
return img_infos
def get_ann_info(self, idx):
"""Get annotation by index.
Args:
idx (int): Index of data.
Returns:
dict: Annotation info of specified index.
"""
return self.img_infos[idx]['ann']
def pre_pipeline(self, results):
"""Prepare results dict for pipeline."""
results['seg_fields'] = []
results['img_prefix'] = self.img_dir
results['seg_prefix'] = self.ann_dir
if self.custom_classes:
results['label_map'] = self.label_map
def __getitem__(self, idx):
"""Get training/test data after pipeline.
Args:
idx (int): Index of data.
Returns:
dict: Training/test data (with annotation if `test_mode` is set
False).
"""
if self.test_mode:
return self.prepare_test_img(idx)
else:
if self.resample is not None:
idx = self.resample(idx)
return self.prepare_train_img(idx)
def prepare_train_img(self, idx):
"""Get training data and annotations after pipeline.
Args:
idx (int): Index of data.
Returns:
dict: Training data and annotation after pipeline with new keys
introduced by pipeline.
"""
def random_crop(img, w, h):
x = int(np.random.randint(0, img.shape[1] - w, 1))
y = int(np.random.randint(0, img.shape[0] - h, 1))
return x, x + w, y, y + h
if self.use_mosaic and np.random.rand() < self.mosaic_prob:
idxes = [idx] + np.random.randint(0, len(self), 3).tolist()
results_list = []
for idx in idxes:
img_info = self.img_infos[idx]
ann_info = self.get_ann_info(idx)
results = dict(img_info=img_info, ann_info=ann_info)
self.pre_pipeline(results)
results_list.append(self.load_pipeline(results))
results = results_list[0]
img = results['img'].copy(); img[:] = 0
masks = {k: results[k].copy() * 0 + 255 for k in results['seg_fields']}
center_x = int(round(np.random.uniform(*self.mosaic_center) * img.shape[1]))
center_y = int(round(np.random.uniform(*self.mosaic_center) * img.shape[0]))
for i in range(4):
if i == 0:
x1, x2, y1, y2 = random_crop(results_list[i]['img'], center_x, center_y)
img[:center_y,:center_x] = results_list[i]['img'][y1:y2, x1:x2]
for k in masks:
masks[k][:center_y,:center_x] = results_list[i][k][y1:y2, x1:x2]
elif i == 1:
x1, x2, y1, y2 = random_crop(results_list[i]['img'], img.shape[1] - center_x, center_y)
img[:center_y,center_x:] = results_list[i]['img'][y1:y2, x1:x2]
for k in masks:
masks[k][:center_y,center_x:] = results_list[i][k][y1:y2, x1:x2]
elif i == 2:
x1, x2, y1, y2 = random_crop(results_list[i]['img'], center_x, img.shape[0] - center_y)
img[center_y:,:center_x] = results_list[i]['img'][y1:y2, x1:x2]
for k in masks:
masks[k][center_y:,:center_x] = results_list[i][k][y1:y2, x1:x2]
elif i == 3:
x1, x2, y1, y2 = random_crop(results_list[i]['img'], img.shape[1] - center_x, img.shape[0] - center_y)
img[center_y:,center_x:] = results_list[i]['img'][y1:y2, x1:x2]
for k in masks:
masks[k][center_y:,center_x:] = results_list[i][k][y1:y2, x1:x2]
results['img'] = img
for k in masks:
results[k] = masks[k]
return self.pipeline(results)
elif self.use_mosaic:
img_info = self.img_infos[idx]
ann_info = self.get_ann_info(idx)
results = dict(img_info=img_info, ann_info=ann_info)
self.pre_pipeline(results)
return self.pipeline(self.load_pipeline(results))
else:
img_info = self.img_infos[idx]
ann_info = self.get_ann_info(idx)
results = dict(img_info=img_info, ann_info=ann_info)
self.pre_pipeline(results)
return self.pipeline(results)
def prepare_test_img(self, idx):
"""Get testing data after pipeline.
Args:
idx (int): Index of data.
Returns:
dict: Testing data after pipeline with new keys introduced by
pipeline.
"""
img_info = self.img_infos[idx]
ann_info = self.get_ann_info(idx)
results = dict(img_info=img_info, ann_info=ann_info)
self.pre_pipeline(results)
return self.pipeline(results)
def format_results(self, results, imgfile_prefix=None, indices = None, **kwargs):
"""Format the results into dir (standard format for Cityscapes
evaluation).
Args:
results (list): Testing results of the dataset.
imgfile_prefix (str | None): The prefix of images files. It
includes the file path and the prefix of filename, e.g.,
"a/b/prefix". If not specified, a temp file will be created.
Default: None.
to_label_id (bool): whether convert output to label_id for
submission. Default: False
Returns:
tuple: (result_files, tmp_dir), result_files is a list containing
the image paths, tmp_dir is the temporal directory created
for saving json/png files when img_prefix is not specified.
"""
result_files = []
for res, idx in zip(results, indices):
if len(res.shape) == 3 and not np.issubdtype(res.dtype, np.integer):
result_file = osp.join(imgfile_prefix, self.img_infos[idx]["filename"][:-4] + ".png")
if not osp.exists(osp.dirname(result_file)):
os.system(f"mkdir -p {osp.dirname(result_file)}")
cv2.imwrite(result_file, (res * 65535).astype(np.uint16))
else:
result_file = osp.join(imgfile_prefix, self.img_infos[idx]["filename"][:-4] + ".png")
if not osp.exists(osp.dirname(result_file)):
os.system(f"mkdir -p {osp.dirname(result_file)}")
Image.fromarray(res.astype(np.uint8)).save(result_file)
result_files.append(result_file)
return result_files
def get_resample_weight(self):
if self.multi_label and self.keep_empty_prob != 1:
probs = []
for idx in range(len(self)):
gt = self.get_gt_seg_map_by_idx(idx)
probs.append(self.keep_empty_prob if gt.sum() == 0 else 1)
probs = np.array(probs)
probs /= probs.sum()
else:
probs = None
self.probs = probs
def resample(self, idx):
if self.probs is None:
return idx
else:
return np.random.choice(len(self), p = self.probs)
def get_gt_seg_map_by_idx(self, index):
"""Get one ground truth segmentation map for evaluation."""
ann_info = self.get_ann_info(index)
results = dict(ann_info=ann_info)
self.pre_pipeline(results)
self.gt_seg_map_loader(results)
return results['gt_semantic_seg']
def get_gt_seg_maps(self, efficient_test=None):
"""Get ground truth segmentation maps for evaluation."""
if efficient_test is not None:
warnings.warn(
'DeprecationWarning: ``efficient_test`` has been deprecated '
'since MMSeg v0.16, the ``get_gt_seg_maps()`` is CPU memory '
'friendly by default. ')
for idx in range(len(self)):
ann_info = self.get_ann_info(idx)
results = dict(ann_info=ann_info)
self.pre_pipeline(results)
self.gt_seg_map_loader(results)
yield results['gt_semantic_seg']
def pre_eval(self, preds, loss, indices):
"""Collect eval result from each iteration.
Args:
preds (list[torch.Tensor] | torch.Tensor): the segmentation logit
after argmax, shape (N, H, W).
indices (list[int] | int): the prediction related ground truth
indices.
Returns:
list[torch.Tensor]: (area_intersect, area_union, area_prediction,
area_ground_truth).
"""
# In order to compat with batch inference
if not isinstance(indices, list):
indices = [indices]
if not isinstance(preds, list):
preds = [preds]
pre_eval_results = []
for pred, index in zip(preds, indices):
seg_map = self.get_gt_seg_map_by_idx(index)
if self.multi_label:
ious = []
for i in range(len(self.CLASSES)):
iou = intersect_and_union(pred[...,i], seg_map[...,i], 2,
self.ignore_index, self.label_map,
self.reduce_zero_label)
ious.append(iou)
ious = tuple([torch.stack([_[i] for _ in ious], 0)[:,1] for i in range(len(ious[0]))])
pre_eval_results.append(ious)
else:
pre_eval_results.append(
intersect_and_union(pred, seg_map, len(self.CLASSES),
self.ignore_index, self.label_map,
self.reduce_zero_label))
for i in range(len(pre_eval_results)):
pre_eval_results[i] = list(pre_eval_results[i])
pre_eval_results[i].append(loss)
return pre_eval_results
def get_classes_and_palette(self, classes=None, palette=None):
"""Get class names of current dataset.
Args:
classes (Sequence[str] | str | None): If classes is None, use
default CLASSES defined by builtin dataset. If classes is a
string, take it as a file name. The file contains the name of
classes where each line contains one class name. If classes is
a tuple or list, override the CLASSES defined by the dataset.
palette (Sequence[Sequence[int]]] | np.ndarray | None):
The palette of segmentation map. If None is given, random
palette will be generated. Default: None
"""
if classes is None:
self.custom_classes = False
return self.CLASSES, self.PALETTE
self.custom_classes = True
if isinstance(classes, str):
# take it as a file path
class_names = mmcv.list_from_file(classes)
elif isinstance(classes, (tuple, list)):
class_names = classes
else:
raise ValueError(f'Unsupported type {type(classes)} of classes.')
if self.CLASSES:
if not set(class_names).issubset(self.CLASSES):
raise ValueError('classes is not a subset of CLASSES.')
# dictionary, its keys are the old label ids and its values
# are the new label ids.
# used for changing pixel labels in load_annotations.
self.label_map = {}
for i, c in enumerate(self.CLASSES):
if c not in class_names:
self.label_map[i] = -1
else:
self.label_map[i] = class_names.index(c)
palette = self.get_palette_for_custom_classes(class_names, palette)
return class_names, palette
def get_palette_for_custom_classes(self, class_names, palette=None):
if self.label_map is not None:
# return subset of palette
palette = []
for old_id, new_id in sorted(
self.label_map.items(), key=lambda x: x[1]):
if new_id != -1:
palette.append(self.PALETTE[old_id])
palette = type(self.PALETTE)(palette)
elif palette is None:
if self.PALETTE is None:
palette = np.random.randint(0, 255, size=(len(class_names), 3))
else:
palette = self.PALETTE
return palette
def evaluate(self,
results,
metric='mIoU',
logger=None,
gt_seg_maps=None,
**kwargs):
"""Evaluate the dataset.
Args:
results (list[tuple[torch.Tensor]] | list[str]): per image pre_eval
results or predict segmentation map for computing evaluation
metric.
metric (str | list[str]): Metrics to be evaluated. 'mIoU',
'mDice' and 'mFscore' are supported.
logger (logging.Logger | None | str): Logger used for printing
related information during evaluation. Default: None.
gt_seg_maps (generator[ndarray]): Custom gt seg maps as input,
used in ConcatDataset
Returns:
dict[str, float]: Default metrics.
"""
if isinstance(metric, str):
metric = [metric]
allowed_metrics = ['mIoU', 'mDice', 'mFscore', 'imDice', 'imIoU', 'imFscore']
if not set(metric).issubset(set(allowed_metrics)):
raise KeyError('metric {} is not supported'.format(metric))
eval_results = {}
# test a list of files
if mmcv.is_list_of(results, np.ndarray) or mmcv.is_list_of(
results, str):
if gt_seg_maps is None:
gt_seg_maps = self.get_gt_seg_maps()
num_classes = len(self.CLASSES)
ret_metrics = eval_metrics(
results,
gt_seg_maps,
num_classes,
self.ignore_index,
metric,
label_map=self.label_map,
reduce_zero_label=self.reduce_zero_label)
# test a list of pre_eval_results
else:
ret_metrics = pre_eval_to_metrics(results, metric)
# Because dataset.CLASSES is required for per-eval.
if self.CLASSES is None:
class_names = tuple(range(num_classes))
else:
class_names = self.CLASSES
# summary table
ret_metrics_summary = OrderedDict({
ret_metric: (np.round(np.nanmean(ret_metric_value) * 100, 2) if "loss" not in ret_metric else np.round(np.nanmean(ret_metric_value), 5))
for ret_metric, ret_metric_value in ret_metrics.items()
})
# each class table
ret_metrics.pop('aAcc', None)
ret_metrics.pop('fwIoU', None)
for key in list(ret_metrics.keys()):
if "loss" in key:
ret_metrics.pop(key, None)
ret_metrics_class = OrderedDict({
ret_metric: np.round(ret_metric_value * 100, 2)
for ret_metric, ret_metric_value in ret_metrics.items()
})
ret_metrics_class.update({'Class': class_names})
ret_metrics_class.move_to_end('Class', last=False)
# for logger
class_table_data = PrettyTable()
for key, val in ret_metrics_class.items():
class_table_data.add_column(key, val)
summary_table_data = PrettyTable()
for key, val in ret_metrics_summary.items():
if key == 'aAcc':
summary_table_data.add_column(key, [val])
elif key == 'fwIoU':
summary_table_data.add_column(key, [val])
elif "loss" in key:
summary_table_data.add_column(key, [val])
else:
summary_table_data.add_column('m' + key, [val])
print_log('per class results:', logger)
print_log('\n' + class_table_data.get_string(), logger=logger)
print_log('Summary:', logger)
print_log('\n' + summary_table_data.get_string(), logger=logger)
# each metric dict
for key, value in ret_metrics_summary.items():
if key == 'aAcc':
eval_results[key] = value / 100.0
elif key == 'fwIoU':
eval_results[key] = value / 100.0
elif "loss" in key:
eval_results[key] = value
else:
eval_results['m' + key] = value / 100.0
ret_metrics_class.pop('Class', None)
for key, value in ret_metrics_class.items():
eval_results.update({
key + '.' + str(name): value[idx] / 100.0
for idx, name in enumerate(class_names)
})
return eval_results