[6b97c3]: / src / utils.py

Download this file

152 lines (124 with data), 4.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import math
from enum import Enum
import numpy as np
import itertools as it
import pandas as pd
import fitting as fit
import warnings
# get all records of patients with 'i' or more data points
# params: study dataframe
# return: dataframe with data points of patients with 'i' or more data points
def get_at_least(study, i):
return study.groupby('PatientID') \
.filter(lambda group: group['PatientID'].count() >= i) \
.reset_index()
# pairwise check if patient ID is reused across studies
# params: list of studies as dataframes
# return: False if the patient ID's are disjoint, True otherwise
def check_patient_overlap(studies):
for study1, study2 in it.combinations(studies, 2):
# pairwise inner join to check if empty
if study1.join(study2, on='PatientID', rsuffix='_2', how='inner').size > 0:
return True
return False
# converts the time (days) to weeks
# e.g if the day 227 => week 32.43
# params: time vector in days
# return: time vector in weeks
def convert_to_weeks(time):
return np.array([i/7 for i in time])
# removes measurements before treatment started
# params: study dataframe
# return: study dataframe with only records since treatmen started
def filter_treatment_started(study):
return study[study['TreatmentDay'] > 0].reset_index()
# Trend enum
class Trend(Enum):
UP = 1
FLUCTUATE = 2
DOWN = 3
def color(self):
if self == Trend.UP:
return '#1a9850'
elif self == Trend.FLUCTUATE:
return '#313695'
elif self == Trend.DOWN:
return '#d73027'
def __lt__(self, other):
return self.value < other.value
# Recist enum
class Recist(Enum):
CR = 1
PR = 2
SD = 3
PD = 4
def color(self):
if self == Recist.CR:
return '#1a9850'
elif self == Recist.PR:
return '#fdcc0f'
elif self == Recist.SD:
return '#313695'
elif self == Recist.PD:
return '#d73027'
def __lt__(self, other):
return self.value < other.value
# detect if the trend of LD data is going, up, down or fluctuates
# based on paper section "Patient categorization according to RECIST and trajectory type"
# params: data point vector
# return: trend enum
def detect_trend(vector):
# get difference vector
v = np.array(vector)
diff_v = v[1:] - v[:-1]
# get sum of positive and negative differences
pos = np.sum(
np.clip(diff_v, a_min=0, a_max=None)
)
neg = - np.sum(
np.clip(diff_v, a_min=None, a_max=0)
)
# UP if strictly positive or sum of positive to sum of negative rate is > 2
if (neg == 0) or (pos / neg > 2):
return Trend.UP
# DOWN if strictly negative or sum of negative to sum of positive rate is > 2
elif (pos == 0) or (neg / pos > 2):
return Trend.DOWN
# FLUCTUATE else
else:
return Trend.FLUCTUATE
def detect_recist(vector):
# get difference vector
v = np.array(vector)
# calculate difference between last diameter and first measured diameter
difference = v[-1] - v[0]
# CR: (Complete Response) dissapearing of all target lesions
if v[-1] == 0:
return Recist.CR
# PR: (Partial Response) at least 30% decrease in diameter
elif difference < -0.3 * v[0]:
return Recist.PR
# PD: (Progressive Disease) at least 20% increase in diamater
elif difference > 0.2 * v[0]:
return Recist.PD
# SD: (Stable disease) none of the above apply
else:
return Recist.SD
def akaike_information_criterion(k, y, y_pred, delta=True):
n = len(y)
df = n - k # degrees of freedom
rss = np.sum((y - y_pred) ** 2) # residual sum of squares
sigma2 = rss / df # reduced chi-squared statistic
if delta:
return 2 * k + n * np.log(sigma2)
else:
# max value log-likelihood (doubled)
lnL2 = - n * np.log(2 * np.pi) - n * np.log(sigma2) - df
return 2 * k - lnL2
def format_float(n):
if n == 0:
return f'0.00000'
elif abs(n) < 0.000001 or abs(n) > 10:
return f'{n:.2e}'
else:
return f'{n:.5f}'