Switch to side-by-side view

--- a
+++ b/opengait/data/transform.py
@@ -0,0 +1,590 @@
+import numpy as np
+import random
+import torchvision.transforms as T
+import cv2
+import math
+from data import transform as base_transform
+from utils import is_list, is_dict, get_valid_args
+
+
+class NoOperation():
+    def __call__(self, x):
+        return x
+
+
+class BaseSilTransform():
+    def __init__(self, divsor=255.0, img_shape=None):
+        self.divsor = divsor
+        self.img_shape = img_shape
+
+    def __call__(self, x):
+        if self.img_shape is not None:
+            s = x.shape[0]
+            _ = [s] + [*self.img_shape]
+            x = x.reshape(*_)
+        return x / self.divsor
+
+
+class BaseParsingCuttingTransform():
+    def __init__(self, divsor=255.0, cutting=None):
+        self.divsor = divsor
+        self.cutting = cutting
+
+    def __call__(self, x):
+        if self.cutting is not None:
+            cutting = self.cutting
+        else:
+            cutting = int(x.shape[-1] // 64) * 10
+        if cutting != 0: 
+            x = x[..., cutting:-cutting]
+        if x.max() == 255 or x.max() == 255.:
+            return x / self.divsor
+        else:
+            return x / 1.0
+
+
+class BaseSilCuttingTransform():
+    def __init__(self, divsor=255.0, cutting=None):
+        self.divsor = divsor
+        self.cutting = cutting
+
+    def __call__(self, x):
+        if self.cutting is not None:
+            cutting = self.cutting
+        else:
+            cutting = int(x.shape[-1] // 64) * 10
+        if cutting != 0: 
+            x = x[..., cutting:-cutting]
+        return x / self.divsor
+
+
+class BaseRgbTransform():
+    def __init__(self, mean=None, std=None):
+        if mean is None:
+            mean = [0.485*255, 0.456*255, 0.406*255]
+        if std is None:
+            std = [0.229*255, 0.224*255, 0.225*255]
+        self.mean = np.array(mean).reshape((1, 3, 1, 1))
+        self.std = np.array(std).reshape((1, 3, 1, 1))
+
+    def __call__(self, x):
+        return (x - self.mean) / self.std
+
+
+# **************** Data Agumentation ****************
+
+
+class RandomHorizontalFlip(object):
+    def __init__(self, prob=0.5):
+        self.prob = prob
+
+    def __call__(self, seq):
+        if random.uniform(0, 1) >= self.prob:
+            return seq
+        else:
+            return seq[..., ::-1]
+
+
+class RandomErasing(object):
+    def __init__(self, prob=0.5, sl=0.05, sh=0.2, r1=0.3, per_frame=False):
+        self.prob = prob
+        self.sl = sl
+        self.sh = sh
+        self.r1 = r1
+        self.per_frame = per_frame
+
+    def __call__(self, seq):
+        if not self.per_frame:
+            if random.uniform(0, 1) >= self.prob:
+                return seq
+            else:
+                for _ in range(100):
+                    seq_size = seq.shape
+                    area = seq_size[1] * seq_size[2]
+
+                    target_area = random.uniform(self.sl, self.sh) * area
+                    aspect_ratio = random.uniform(self.r1, 1 / self.r1)
+
+                    h = int(round(math.sqrt(target_area * aspect_ratio)))
+                    w = int(round(math.sqrt(target_area / aspect_ratio)))
+
+                    if w < seq_size[2] and h < seq_size[1]:
+                        x1 = random.randint(0, seq_size[1] - h)
+                        y1 = random.randint(0, seq_size[2] - w)
+                        seq[:, x1:x1+h, y1:y1+w] = 0.
+                        return seq
+            return seq
+        else:
+            self.per_frame = False
+            frame_num = seq.shape[0]
+            ret = [self.__call__(seq[k][np.newaxis, ...])
+                   for k in range(frame_num)]
+            self.per_frame = True
+            return np.concatenate(ret, 0)
+
+
+class RandomRotate(object):
+    def __init__(self, prob=0.5, degree=10):
+        self.prob = prob
+        self.degree = degree
+
+    def __call__(self, seq):
+        if random.uniform(0, 1) >= self.prob:
+            return seq
+        else:
+            dh, dw = seq.shape[-2:]
+            # rotation
+            degree = random.uniform(-self.degree, self.degree)
+            M1 = cv2.getRotationMatrix2D((dh // 2, dw // 2), degree, 1)
+            # affine
+            if len(seq.shape) == 4:
+                seq = seq.transpose(0, 2, 3, 1)
+            seq = [cv2.warpAffine(_[0, ...], M1, (dw, dh))
+                for _ in np.split(seq, seq.shape[0], axis=0)]
+            seq = np.concatenate([np.array(_)[np.newaxis, ...]
+                                 for _ in seq], 0)
+            if len(seq.shape) == 4:
+                seq = seq.transpose(0, 3, 1, 2)
+            return seq
+
+
+class RandomPerspective(object):
+    def __init__(self, prob=0.5):
+        self.prob = prob
+
+    def __call__(self, seq):
+        if random.uniform(0, 1) >= self.prob:
+            return seq
+        else:
+            h, w = seq.shape[-2:]
+            cutting = int(w // 44) * 10
+            x_left = list(range(0, cutting))
+            x_right = list(range(w - cutting, w))
+            TL = (random.choice(x_left), 0)
+            TR = (random.choice(x_right), 0)
+            BL = (random.choice(x_left), h)
+            BR = (random.choice(x_right), h)
+            srcPoints = np.float32([TL, TR, BR, BL])
+            canvasPoints = np.float32([[0, 0], [w, 0], [w, h], [0, h]])
+            perspectiveMatrix = cv2.getPerspectiveTransform(
+                np.array(srcPoints), np.array(canvasPoints))
+            if len(seq.shape) == 4:
+                seq = seq.transpose(0, 2, 3, 1)
+            seq = [cv2.warpPerspective(_[0, ...], perspectiveMatrix, (w, h))
+                   for _ in np.split(seq, seq.shape[0], axis=0)]
+            seq = np.concatenate([np.array(_)[np.newaxis, ...]
+                                 for _ in seq], 0)
+            if len(seq.shape) == 4:
+                seq = seq.transpose(0, 3, 1, 2)
+            return seq
+
+
+class RandomAffine(object):
+    def __init__(self, prob=0.5, degree=10):
+        self.prob = prob
+        self.degree = degree
+
+    def __call__(self, seq):
+        if random.uniform(0, 1) >= self.prob:
+            return seq
+        else:
+            dh, dw = seq.shape[-2:]
+            # rotation
+            max_shift = int(dh // 64 * 10)
+            shift_range = list(range(0, max_shift))
+            pts1 = np.float32([[random.choice(shift_range), random.choice(shift_range)], [
+                              dh-random.choice(shift_range), random.choice(shift_range)], [random.choice(shift_range), dw-random.choice(shift_range)]])
+            pts2 = np.float32([[random.choice(shift_range), random.choice(shift_range)], [
+                              dh-random.choice(shift_range), random.choice(shift_range)], [random.choice(shift_range), dw-random.choice(shift_range)]])
+            M1 = cv2.getAffineTransform(pts1, pts2)
+            # affine
+            if len(seq.shape) == 4:
+                seq = seq.transpose(0, 2, 3, 1)
+            seq = [cv2.warpAffine(_[0, ...], M1, (dw, dh))
+                   for _ in np.split(seq, seq.shape[0], axis=0)]
+            seq = np.concatenate([np.array(_)[np.newaxis, ...]
+                                 for _ in seq], 0)
+            if len(seq.shape) == 4:
+                seq = seq.transpose(0, 3, 1, 2)
+            return seq
+        
+# ******************************************
+
+def Compose(trf_cfg):
+    assert is_list(trf_cfg)
+    transform = T.Compose([get_transform(cfg) for cfg in trf_cfg])
+    return transform
+
+
+def get_transform(trf_cfg=None):
+    if is_dict(trf_cfg):
+        transform = getattr(base_transform, trf_cfg['type'])
+        valid_trf_arg = get_valid_args(transform, trf_cfg, ['type'])
+        return transform(**valid_trf_arg)
+    if trf_cfg is None:
+        return lambda x: x
+    if is_list(trf_cfg):
+        transform = [get_transform(cfg) for cfg in trf_cfg]
+        return transform
+    raise "Error type for -Transform-Cfg-"
+
+# **************** For GaitSSB ****************
+# Fan, et al: Learning Gait Representation from Massive Unlabelled Walking Videos: A Benchmark, T-PAMI2023
+
+class RandomPartDilate():
+    def __init__(self, prob=0.5, top_range=(12, 16), bot_range=(36, 40)):
+        self.prob = prob
+        self.top_range = top_range
+        self.bot_range = bot_range
+        self.modes_and_kernels = {
+            'RECT': [[5, 3], [5, 5], [3, 5]],
+            'CROSS': [[3, 3], [3, 5], [5, 3]],
+            'ELLIPSE': [[3, 3], [3, 5], [5, 3]]}
+        self.modes = list(self.modes_and_kernels.keys())
+
+    def __call__(self, seq):
+        '''
+            Using the image dialte and affine transformation to simulate the clorhing change cases.
+        Input:
+            seq: a sequence of silhouette frames, [s, h, w]
+        Output:
+            seq: a sequence of agumented frames, [s, h, w]
+        '''
+        if random.uniform(0, 1) >= self.prob:
+                return seq
+        else:
+            mode = random.choice(self.modes)
+            kernel_size = random.choice(self.modes_and_kernels[mode])
+            top = random.randint(self.top_range[0], self.top_range[1])
+            bot = random.randint(self.bot_range[0], self.bot_range[1])
+
+            seq = seq.transpose(1, 2, 0) # [s, h, w] -> [h, w, s]
+            _seq_ = seq.copy()
+            _seq_ = _seq_[top:bot, ...]
+            _seq_ = self.dilate(_seq_, kernel_size=kernel_size, mode=mode)
+            seq[top:bot, ...] = _seq_
+            seq = seq.transpose(2, 0, 1) # [h, w, s] -> [s, h, w]
+            return seq
+
+    def dilate(self, img, kernel_size=[3, 3], mode='RECT'):
+        '''
+            MORPH_RECT, MORPH_CROSS, ELLIPSE
+        Input:
+            img: [h, w]
+        Output:
+            img: [h, w]
+        '''
+        assert mode in ['RECT', 'CROSS', 'ELLIPSE']
+        kernel = cv2.getStructuringElement(getattr(cv2, 'MORPH_'+mode), kernel_size)
+        dst = cv2.dilate(img, kernel)
+        return dst
+
+class RandomPartBlur():
+    def __init__(self, prob=0.5, top_range=(9, 20), bot_range=(29, 40), per_frame=False):
+        self.prob = prob
+        self.top_range = top_range
+        self.bot_range = bot_range
+        self.per_frame = per_frame
+
+    def __call__(self, seq):
+        '''
+        Input:
+            seq: a sequence of silhouette frames, [s, h, w]
+        Output:
+            seq: a sequence of agumented frames, [s, h, w]
+        '''
+        if not self.per_frame:
+            if random.uniform(0, 1) >= self.prob:
+                return seq
+            else:
+                top = random.randint(self.top_range[0], self.top_range[1])
+                bot = random.randint(self.bot_range[0], self.bot_range[1])
+
+                seq = seq.transpose(1, 2, 0) # [s, h, w] -> [h, w, s]
+                _seq_ = seq.copy()
+                _seq_ = _seq_[top:bot, ...]
+                _seq_ = cv2.GaussianBlur(_seq_, ksize=(3, 3), sigmaX=0)
+                _seq_ = (_seq_ > 0.2).astype(np.float)
+                seq[top:bot, ...] = _seq_
+                seq = seq.transpose(2, 0, 1) # [h, w, s] -> [s, h, w]
+
+            return seq
+        else:
+            self.per_frame = False
+            frame_num = seq.shape[0]
+            ret = [self.__call__(seq[k][np.newaxis, ...]) for k in range(frame_num)]
+            self.per_frame = True
+            return np.concatenate(ret, 0)
+
+def DA4GaitSSB(
+    cutting = None,
+    ra_prob = 0.2,
+    rp_prob = 0.2,
+    rhf_prob = 0.5,
+    rpd_prob = 0.2,
+    rpb_prob = 0.2,
+    top_range = (9, 20),
+    bot_range = (39, 50),
+):
+    transform = T.Compose([
+            RandomAffine(prob=ra_prob),
+            RandomPerspective(prob=rp_prob),
+            BaseSilCuttingTransform(cutting=cutting),
+            RandomHorizontalFlip(prob=rhf_prob),
+            RandomPartDilate(prob=rpd_prob, top_range=top_range, bot_range=bot_range),
+            RandomPartBlur(prob=rpb_prob, top_range=top_range, bot_range=bot_range),
+    ])
+    return transform
+
+# **************** For pose-based methods ****************
+class RandomSelectSequence(object):
+    """
+    Randomly select different subsequences
+    """
+    def __init__(self, sequence_length=10):
+        self.sequence_length = sequence_length
+
+    def __call__(self, data):
+        try:
+            start = np.random.randint(0, data.shape[0] - self.sequence_length)
+        except ValueError:
+            raise ValueError("The sequence length of data is too short, which does not meet the requirements.")
+        end = start + self.sequence_length
+        return data[start:end]
+
+
+class SelectSequenceCenter(object):
+    """
+    Select center subsequence
+    """
+    def __init__(self, sequence_length=10):
+        self.sequence_length = sequence_length
+
+    def __call__(self, data):
+        try:
+            start = int((data.shape[0]/2) - (self.sequence_length / 2))
+        except ValueError:
+            raise ValueError("The sequence length of data is too short, which does not meet the requirements.")
+        end = start + self.sequence_length
+        return data[start:end]
+
+
+class MirrorPoses(object):
+    """
+    Performing Mirror Operations
+    """
+    def __init__(self, prob=0.5):
+        self.prob = prob
+
+    def __call__(self, data):
+        if np.random.random() <= self.prob:
+            center = np.mean(data[:, :, 0], axis=1, keepdims=True)
+            data[:, :, 0] = center - data[:, :, 0] + center
+
+        return data
+
+
+class NormalizeEmpty(object):
+    """
+    Normliza Empty Joint
+    """
+    def __call__(self, data):
+        frames, joints = np.where(data[:, :, 0] == 0)
+        for frame, joint in zip(frames, joints):
+            center_of_gravity = np.mean(data[frame], axis=0)
+            data[frame, joint, 0] = center_of_gravity[0]
+            data[frame, joint, 1] = center_of_gravity[1]
+            data[frame, joint, 2] = 0
+        return data
+
+
+class RandomMove(object):
+    """
+    Move: add Random Movement to each joint
+    """
+    def __init__(self,random_r =[4,1]):
+        self.random_r = random_r
+    def __call__(self, data):
+        noise = np.zeros(3)
+        noise[0] = np.random.uniform(-self.random_r[0], self.random_r[0])
+        noise[1] = np.random.uniform(-self.random_r[1], self.random_r[1])
+        data += np.tile(noise,(data.shape[0], data.shape[1], 1))
+        return data
+
+
+class PointNoise(object):
+    """
+    Add Gaussian noise to pose points
+    std: standard deviation
+    """
+    def __init__(self, std=0.01):
+        self.std = std
+
+    def __call__(self, data):
+        noise = np.random.normal(0, self.std, data.shape).astype(np.float32)
+        return data + noise
+
+
+class FlipSequence(object):
+    """
+    Temporal Fliping
+    """
+    def __init__(self, probability=0.5):
+        self.probability = probability
+    def __call__(self, data):
+        if np.random.random() <= self.probability:
+            return np.flip(data,axis=0).copy()
+        return data
+
+
+class InversePosesPre(object):
+    '''
+    Left-right flip of skeletons
+    '''
+    def __init__(self, probability=0.5, joint_format='coco'):
+        self.probability = probability
+        if joint_format == 'coco':
+            self.invers_arr = [0, 2, 1, 4, 3, 6, 5, 8, 7, 10, 9, 12, 11, 14, 13, 16, 15]
+        elif joint_format in ['alphapose', 'openpose']:
+            self.invers_arr = [0, 1, 5, 6, 7, 2, 3, 4, 11, 12, 13, 8, 9, 10, 15, 14, 17, 16]
+        else:
+            raise ValueError("Invalid joint_format.")
+            
+
+    def __call__(self, data):
+        for i in range(len(data)):
+            if np.random.random() <= self.probability:
+                data[i]=data[i,self.invers_arr,:]
+        return data
+
+
+class JointNoise(object):
+    """
+    Add Gaussian noise to joint
+    std: standard deviation
+    """
+
+    def __init__(self, std=0.25):
+        self.std = std
+
+    def __call__(self, data):
+        # T, V, C
+        noise = np.hstack((
+            np.random.normal(0, self.std, (data.shape[1], 2)),
+            np.zeros((data.shape[1], 1))
+        )).astype(np.float32)
+
+        return data + np.repeat(noise[np.newaxis, ...], data.shape[0], axis=0)
+
+
+class GaitTRMultiInput(object):
+    def __init__(self, joint_format='coco',):
+        if joint_format == 'coco':
+            self.connect_joint = np.array([5,0,0,1,2,0,0,5,6,7,8,5,6,11,12,13,14])
+        elif joint_format in ['alphapose', 'openpose']:
+            self.connect_joint = np.array([1,1,1,2,3,1,5,6,2,8,9,5,11,12,0,0,14,15])
+        else:
+            raise ValueError("Invalid joint_format.")
+
+    def __call__(self, data):
+        # (C, T, V) -> (I, C * 2, T, V)
+        data = np.transpose(data, (2, 0, 1))
+
+        data = data[:2, :, :]
+
+        C, T, V = data.shape
+        data_new = np.zeros((5, C, T, V))
+        # Joints
+        data_new[0, :C, :, :] = data
+        for i in range(V):
+            data_new[1, :, :, i] = data[:, :, i] - data[:, :, 0]
+        # Velocity
+        for i in range(T - 2):
+            data_new[2, :, i, :] = data[:, i + 1, :] - data[:, i, :]
+            data_new[3, :, i, :] = data[:, i + 2, :] - data[:, i, :]
+        # Bones
+        for i in range(len(self.connect_joint)):
+            data_new[4, :, :, i] = data[:, :, i] - data[:, :, self.connect_joint[i]]
+        
+        I, C, T, V = data_new.shape
+        data_new = data_new.reshape(I*C, T, V)
+        # (C T V) -> (T V C)
+        data_new = np.transpose(data_new, (1, 2, 0))
+
+        return data_new
+
+
+class GaitGraphMultiInput(object):
+    def __init__(self, center=0, joint_format='coco'):
+        self.center = center
+        if joint_format == 'coco':
+            self.connect_joint = np.array([5,0,0,1,2,0,0,5,6,7,8,5,6,11,12,13,14])
+        elif joint_format in ['alphapose', 'openpose']:
+            self.connect_joint = np.array([1,1,1,2,3,1,5,6,2,8,9,5,11,12,0,0,14,15])
+        else:
+            raise ValueError("Invalid joint_format.")
+
+    def __call__(self, data):
+        T, V, C = data.shape
+        x_new = np.zeros((T, V, 3, C + 2))
+        # Joints
+        x = data
+        x_new[:, :, 0, :C] = x
+        for i in range(V):
+            x_new[:, i, 0, C:] = x[:, i, :2] - x[:, self.center, :2]
+        # Velocity
+        for i in range(T - 2):
+            x_new[i, :, 1, :2] = x[i + 1, :, :2] - x[i, :, :2]
+            x_new[i, :, 1, 3:] = x[i + 2, :, :2] - x[i, :, :2]
+        x_new[:, :, 1, 3] = x[:, :, 2]
+        # Bones
+        for i in range(V):
+            x_new[:, i, 2, :2] = x[:, i, :2] - x[:, self.connect_joint[i], :2]
+        # Angles
+        bone_length = 0
+        for i in range(C - 1):
+            bone_length += np.power(x_new[:, :, 2, i], 2)
+        bone_length = np.sqrt(bone_length) + 0.0001
+        for i in range(C - 1):
+            x_new[:, :, 2, C+i] = np.arccos(x_new[:, :, 2, i] / bone_length)
+        x_new[:, :, 2, 3] = x[:, :, 2]
+        return x_new
+
+class GaitGraph1Input(object):
+    '''
+    Transpose the input
+    '''
+    def __call__(self, data):
+        # (T V C) -> (C T V)
+        data = np.transpose(data, (2, 0, 1))
+        return data[...,np.newaxis]
+
+class SkeletonInput(object):
+    '''
+    Transpose the input
+    '''
+    def __call__(self, data):
+        # (T V C) -> (T C V)
+        data = np.transpose(data, (0, 2, 1))
+        return data[...,np.newaxis]
+
+class TwoView(object):
+    def __init__(self,trf_cfg):
+        assert is_list(trf_cfg)
+        self.transform = T.Compose([get_transform(cfg) for cfg in trf_cfg])
+    def __call__(self, data):
+        return np.concatenate([self.transform(data), self.transform(data)], axis=1)
+
+
+class MSGGTransform():
+    def __init__(self, joint_format="coco"):
+        if joint_format == "coco": #17
+            self.mask=[6,8,14,12,7,13,5,10,16,11,9,15]
+        elif joint_format in ['alphapose', 'openpose']: #18
+            self.mask=[2,3,9,8,6,12,5,4,10,11,7,13]
+        else:
+            raise ValueError("Invalid joint_format.")
+        
+    def __call__(self, x):
+        result=x[...,self.mask,:].copy()
+        return result