a b/sybil/augmentations.py
1
import cv2
2
import torch
3
import torchvision
4
5
from typing import Literal
6
from abc import ABCMeta, abstractmethod
7
import numpy as np
8
import random
9
10
try:
11
    import albumentations as A
12
except ImportError:
13
    # albumentations is not installed, training with augmentations will not be possible
14
    A = None
15
16
17
def get_augmentations(split: Literal["train", "dev", "test"], args):
18
    if split == "train":
19
        augmentations = [
20
            Scale_2d(args, {}),
21
            Rotate_Range(args, {"deg": 20}),
22
            ToTensor(),
23
            Force_Num_Chan_Tensor_2d(args, {}),
24
            Normalize_Tensor_2d(args, {}),
25
        ]
26
    else:
27
        augmentations = [
28
            Scale_2d(args, {}),
29
            ToTensor(),
30
            Force_Num_Chan_Tensor_2d(args, {}),
31
            Normalize_Tensor_2d(args, {}),
32
        ]
33
34
    return augmentations
35
36
37
class Abstract_augmentation(object):
38
    """
39
    Abstract-transformer.
40
    Default - non cachable
41
    """
42
43
    __metaclass__ = ABCMeta
44
45
    def __init__(self):
46
        self._is_cachable = False
47
        self._trans_sep = "@"
48
        self._attr_sep = "#"
49
        self.name = (
50
            self.__str__().split("sybil.augmentations.")[-1].split(" ")[0].lower()
51
        )
52
53
    @abstractmethod
54
    def __call__(self, input_dict):
55
        pass
56
57
    def set_seed(self, seed):
58
        random.seed(seed)
59
        np.random.seed(seed)
60
        torch.random.manual_seed(seed)
61
62
    def cachable(self):
63
        return self._is_cachable
64
65
    def set_cachable(self, *keys):
66
        """
67
        Sets the transformer as cachable
68
        and sets the _caching_keys according to the input variables.
69
        """
70
        self._is_cachable = True
71
        name_str = "{}{}".format(self._trans_sep, self.name)
72
        keys_str = "".join(self._attr_sep + str(k) for k in keys)
73
        self._caching_keys = "{}{}".format(name_str, keys_str)
74
        return
75
76
    def caching_keys(self):
77
        return self._caching_keys
78
79
80
class ComposeAug(Abstract_augmentation):
81
    """
82
    Composes multiple augmentations
83
    """
84
85
    def __init__(self, augmentations):
86
        super(ComposeAug, self).__init__()
87
        self.augmentations = augmentations
88
89
    def __call__(self, input_dict, sample=None):
90
        for transformer in self.augmentations:
91
            input_dict = transformer(input_dict, sample)
92
93
        return input_dict
94
95
96
class ToTensor(Abstract_augmentation):
97
    """
98
    torchvision.transforms.ToTensor wrapper.
99
    """
100
101
    def __init__(self):
102
        super(ToTensor, self).__init__()
103
        self.name = "totensor"
104
105
    def __call__(self, input_dict, sample=None):
106
        input_dict["input"] = torch.from_numpy(input_dict["input"]).float()
107
        if input_dict.get("mask", None) is not None:
108
            input_dict["mask"] = torch.from_numpy(input_dict["mask"]).float()
109
        return input_dict
110
111
112
class ResizeTransform:
113
    def __init__(self, width, height):
114
        self.width = width
115
        self.height = height
116
117
    def __call__(self, image=None, mask=None):
118
        out = {"image": None, "mask": None}
119
        if image is not None:
120
            out["image"] = cv2.resize(image, dsize=(self.width, self.height), interpolation=cv2.INTER_LINEAR)
121
        if mask is not None:
122
            out["mask"] = cv2.resize(mask, dsize=(self.width, self.height), interpolation=cv2.INTER_NEAREST)
123
        return out
124
125
126
class Scale_2d(Abstract_augmentation):
127
    """
128
    Given PIL image, enforce its some set size
129
    (can use for down sampling / keep full res)
130
    """
131
132
    def __init__(self, args, kwargs):
133
        super(Scale_2d, self).__init__()
134
        assert len(kwargs.keys()) == 0
135
        width, height = args.img_size
136
        self.set_cachable(width, height)
137
        self.transform = ResizeTransform(width, height)
138
139
    def __call__(self, input_dict, sample=None):
140
        out = self.transform(
141
            image=input_dict["input"], mask=input_dict.get("mask", None)
142
        )
143
        input_dict["input"] = out["image"]
144
        input_dict["mask"] = out["mask"]
145
        return input_dict
146
147
148
class Rotate_Range(Abstract_augmentation):
149
    """
150
    Rotate image counter clockwise by random degree https://albumentations.ai/docs/api_reference/augmentations/geometric/rotate/#albumentations.augmentations.geometric.rotate.Rotate
151
152
        kwargs
153
            deg: max degrees to rotate
154
    """
155
156
    def __init__(self, args, kwargs):
157
        super(Rotate_Range, self).__init__()
158
        assert len(kwargs.keys()) == 1
159
        self.max_angle = int(kwargs["deg"])
160
        assert A is not None, "albumentations is not installed"
161
        self.transform = A.Rotate(limit=self.max_angle, p=0.5)
162
163
    def __call__(self, input_dict, sample=None):
164
        if sample and "seed" in sample:
165
            self.set_seed(sample["seed"])
166
        out = self.transform(
167
            image=input_dict["input"], mask=input_dict.get("mask", None)
168
        )
169
        input_dict["input"] = out["image"]
170
        input_dict["mask"] = out["mask"]
171
        return input_dict
172
173
174
class Normalize_Tensor_2d(Abstract_augmentation):
175
    """
176
    Normalizes input by channel
177
    wrapper for torchvision.transforms.Normalize wrapper.
178
    """
179
180
    def __init__(self, args, kwargs):
181
        super(Normalize_Tensor_2d, self).__init__()
182
        assert len(kwargs) == 0
183
        channel_means = [args.img_mean] if len(args.img_mean) == 1 else args.img_mean
184
        channel_stds = [args.img_std] if len(args.img_std) == 1 else args.img_std
185
186
        self.transform = torchvision.transforms.Normalize(
187
            torch.Tensor(channel_means), torch.Tensor(channel_stds)
188
        )
189
190
        self.permute = args.img_file_type in [
191
            "png",
192
        ]
193
194
    def __call__(self, input_dict, sample=None):
195
        img = input_dict["input"]
196
        if len(img.size()) == 2:
197
            img = img.unsqueeze(0)
198
199
        if self.permute:
200
            img = img.permute(2, 0, 1)
201
            input_dict["input"] = self.transform(img).permute(1, 2, 0)
202
        else:
203
            input_dict["input"] = self.transform(img)
204
205
        return input_dict
206
207
208
class Force_Num_Chan_Tensor_2d(Abstract_augmentation):
209
    """
210
    Convert gray scale images to image with args.num_chan num channels.
211
    """
212
213
    def __init__(self, args, kwargs):
214
        super(Force_Num_Chan_Tensor_2d, self).__init__()
215
        assert len(kwargs) == 0
216
        self.args = args
217
218
    def __call__(self, input_dict, sample=None):
219
        img = input_dict["input"]
220
        mask = input_dict.get("mask", None)
221
        if mask is not None:
222
            input_dict["mask"] = mask.unsqueeze(0)
223
224
        num_dims = len(img.shape)
225
        if num_dims == 2:
226
            img = img.unsqueeze(0)
227
        existing_chan = img.size()[0]
228
        if not existing_chan == self.args.num_chan:
229
            input_dict["input"] = img.expand(self.args.num_chan, *img.size()[1:])
230
231
        return input_dict