[77dc1e]: / datasets.py

Download this file

122 lines (99 with data), 4.9 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import numpy as np
import pandas as pd
from PIL import Image, ImageFile
import os
import torch
from torch.utils.data import Dataset, Sampler
import sys
sys.path.append("/home/anjum/PycharmProjects/kaggle")
# sys.path.append("/home/anjum/rsna_code") # GCP
from rsna_intracranial_hemorrhage_detection.data_prep import linear_windowing, sigmoid_windowing
ImageFile.LOAD_TRUNCATED_IMAGES = True
data_path = "/mnt/storage_dimm2/kaggle_data/rsna-intracranial-hemorrhage-detection/"
# data_path = "/home/anjum/rsna_data/" # GCP
class ICHDataset(Dataset):
def __init__(self, dataset, phase=1, image_filter=None, transforms=None, image_folder=None, png=True):
df_paths = {
"train": os.path.join(data_path, "stage_1_train.csv"),
"test1": os.path.join(data_path, "stage_1_sample_submission.csv"),
"test2": os.path.join(data_path, "stage_2_sample_submission.csv")
}
self.png = png
if self.png:
image_dirs = {
"train": os.path.join(data_path, "png", "train", image_folder),
"test1": os.path.join(data_path, "png", "test_stage_1", image_folder),
"test2": os.path.join(data_path, "png", "test_stage_2", image_folder)
}
else:
image_dirs = {
"train": os.path.join(data_path, "npy", "train", image_folder),
"test1": os.path.join(data_path, "npy", "test_stage_1", image_folder),
"test2": os.path.join(data_path, "npy", "test_stage_2", image_folder)
}
self.dataset = dataset
self.phase = phase
self.transforms = transforms
self.image_dir = image_dirs[dataset]
self.df = pd.read_csv(df_paths[dataset]).drop_duplicates()
self.df['ImageID'] = self.df['ID'].str.slice(stop=12)
self.df['Diagnosis'] = self.df['ID'].str.slice(start=13)
self.df_pivot = self.df.pivot(index="ImageID", columns="Diagnosis", values="Label")
if image_filter is not None:
self.df_pivot = self.df_pivot.loc[image_filter]
if self.phase == 0:
self.labels = self.df_pivot.values
elif self.phase == 1:
self.labels = self.df_pivot["any"].values.reshape(-1, 1)
else:
self.labels = self.df_pivot[["epidural", "intraparenchymal",
"intraventricular", "subarachnoid", "subdural"]].values
self.image_ids = self.df_pivot.index.values
self.class_weights = np.mean(self.labels, axis=0)
def load_image(self, image_name):
# window_width, window_length = 80, 40 # Brain window
window_width, window_length = 200, 80 # Subdural window
# window_width, window_length = 130, 50 # Subdural window
if self.png:
img = np.array(Image.open(os.path.join(self.image_dir, image_name+".png")).convert("RGB"))
return linear_windowing(img, window_width, window_length)
# return sigmoid_windowing(img, window_width, window_length)
else:
img = np.load(os.path.join(self.image_dir, image_name+".npy"))
# PIL doesn't work with 16-bit RGB images :(
# Should be ok though since the useful HU interval is between 0-255
if img.shape[0] == 0 or img.shape[1] == 0:
return np.zeros(shape=(512, 512, 3), dtype=np.uint8)
else:
# return np.clip(img, 0, 255).astype(np.uint8) # Use this with the Windowing module
return linear_windowing(img, window_width, window_length)
# return sigmoid_windowing(img, window_width, window_length)
def __getitem__(self, idx):
img_id = self.image_ids[idx]
img = self.load_image(img_id)
if self.transforms is not None:
img = self.transforms(img)
if self.dataset == "train":
return img, torch.tensor(self.labels[idx], dtype=torch.float32)
else:
return img, torch.tensor([0], dtype=torch.float32)
def __len__(self):
return len(self.image_ids)
class BalancedRandomSampler(Sampler):
def __init__(self, data_source):
"""
Balances the negative and positive samples. All of the positive samples are used, but a random subset of
the negative samples are used to create a 50:50 dataset
:param data_source: An ICHDataset
"""
super().__init__(data_source)
self.labels = data_source.labels
self.ids_pos = np.where(self.labels[:, 0] == 1)[0]
self.ids_neg = np.where(self.labels[:, 0] == 0)[0]
def __iter__(self):
ids_neg_sampled = np.random.choice(self.ids_neg, self.ids_pos.shape[0], replace=False)
ids = np.concatenate([self.ids_pos, ids_neg_sampled])
np.random.shuffle(ids)
return iter(ids)
def __len__(self):
return self.ids_pos.shape[0] * 2