# Copyright (c) OpenMMLab. All rights reserved.
import copy
import os.path as osp
import random
import mmcv
from .base import BaseDataset
from .builder import DATASETS
@DATASETS.register_module()
class RawVideoDataset(BaseDataset):
"""RawVideo dataset for action recognition, used in the Project OmniSource.
The dataset loads clips of raw videos and apply specified transforms to
return a dict containing the frame tensors and other information. Not that
for this dataset, `multi_class` should be False.
The ann_file is a text file with multiple lines, and each line indicates
a sample video with the filepath (without suffix), label, number of clips
and index of positive clips (starting from 0), which are split with a
whitespace. Raw videos should be first trimmed into 10 second clips,
organized in the following format:
.. code-block:: txt
some/path/D32_1gwq35E/part_0.mp4
some/path/D32_1gwq35E/part_1.mp4
......
some/path/D32_1gwq35E/part_n.mp4
Example of a annotation file:
.. code-block:: txt
some/path/D32_1gwq35E 66 10 0 1 2
some/path/-G-5CJ0JkKY 254 5 3 4
some/path/T4h1bvOd9DA 33 1 0
some/path/4uZ27ivBl00 341 2 0 1
some/path/0LfESFkfBSw 186 234 7 9 11
some/path/-YIsNpBEx6c 169 100 9 10 11
The first line indicates that the raw video `some/path/D32_1gwq35E` has
action label `66`, consists of 10 clips (from `part_0.mp4` to
`part_9.mp4`). The 1st, 2nd and 3rd clips are positive clips.
Args:
ann_file (str): Path to the annotation file.
pipeline (list[dict | callable]): A sequence of data transforms.
sampling_strategy (str): The strategy to sample clips from raw videos.
Choices are 'random' or 'positive'. Default: 'positive'.
clipname_tmpl (str): The template of clip name in the raw video.
Default: 'part_{}.mp4'.
**kwargs: Keyword arguments for ``BaseDataset``.
"""
def __init__(self,
ann_file,
pipeline,
clipname_tmpl='part_{}.mp4',
sampling_strategy='positive',
**kwargs):
super().__init__(ann_file, pipeline, start_index=0, **kwargs)
assert self.multi_class is False
self.sampling_strategy = sampling_strategy
self.clipname_tmpl = clipname_tmpl
# If positive, we should only keep those raw videos with positive
# clips
if self.sampling_strategy == 'positive':
self.video_infos = [
x for x in self.video_infos if len(x['positive_clip_inds'])
]
# do not support multi_class
def load_annotations(self):
"""Load annotation file to get video information."""
if self.ann_file.endswith('.json'):
return self.load_json_annotations()
video_infos = []
with open(self.ann_file, 'r') as fin:
for line in fin:
line_split = line.strip().split()
video_dir = line_split[0]
label = int(line_split[1])
num_clips = int(line_split[2])
positive_clip_inds = [int(ind) for ind in line_split[3:]]
if self.data_prefix is not None:
video_dir = osp.join(self.data_prefix, video_dir)
video_infos.append(
dict(
video_dir=video_dir,
label=label,
num_clips=num_clips,
positive_clip_inds=positive_clip_inds))
return video_infos
# do not support multi_class
def load_json_annotations(self):
"""Load json annotation file to get video information."""
video_infos = mmcv.load(self.ann_file)
num_videos = len(video_infos)
path_key = 'video_dir'
for i in range(num_videos):
if self.data_prefix is not None:
path_value = video_infos[i][path_key]
path_value = osp.join(self.data_prefix, path_value)
video_infos[i][path_key] = path_value
return video_infos
def sample_clip(self, results):
"""Sample a clip from the raw video given the sampling strategy."""
assert self.sampling_strategy in ['positive', 'random']
if self.sampling_strategy == 'positive':
assert results['positive_clip_inds']
ind = random.choice(results['positive_clip_inds'])
else:
ind = random.randint(0, results['num_clips'] - 1)
clipname = self.clipname_tmpl.format(ind)
# if the first char of self.clipname_tmpl is a letter, use osp.join;
# otherwise, directly concat them
if self.clipname_tmpl[0].isalpha():
filename = osp.join(results['video_dir'], clipname)
else:
filename = results['video_dir'] + clipname
results['filename'] = filename
return results
def prepare_train_frames(self, idx):
"""Prepare the frames for training given the index."""
results = copy.deepcopy(self.video_infos[idx])
results = self.sample_clip(results)
results['modality'] = self.modality
results['start_index'] = self.start_index
return self.pipeline(results)
def prepare_test_frames(self, idx):
"""Prepare the frames for testing given the index."""
results = copy.deepcopy(self.video_infos[idx])
results = self.sample_clip(results)
results['modality'] = self.modality
results['start_index'] = self.start_index
return self.pipeline(results)