# Copyright (c) OpenMMLab. All rights reserved.
import csv
import fnmatch
import glob
import json
import os
import os.path as osp
def parse_directory(path,
rgb_prefix='img_',
flow_x_prefix='flow_x_',
flow_y_prefix='flow_y_',
level=1):
"""Parse directories holding extracted frames from standard benchmarks.
Args:
path (str): Directory path to parse frames.
rgb_prefix (str): Prefix of generated rgb frames name.
default: 'img_'.
flow_x_prefix (str): Prefix of generated flow x name.
default: `flow_x_`.
flow_y_prefix (str): Prefix of generated flow y name.
default: `flow_y_`.
level (int): Directory level for glob searching. Options are 1 and 2.
default: 1.
Returns:
dict: frame info dict with video id as key and tuple(path(str),
rgb_num(int), flow_x_num(int)) as value.
"""
print(f'parse frames under directory {path}')
if level == 1:
# Only search for one-level directory
def locate_directory(x):
return osp.basename(x)
frame_dirs = glob.glob(osp.join(path, '*'))
elif level == 2:
# search for two-level directory
def locate_directory(x):
return osp.join(osp.basename(osp.dirname(x)), osp.basename(x))
frame_dirs = glob.glob(osp.join(path, '*', '*'))
else:
raise ValueError('level can be only 1 or 2')
def count_files(directory, prefix_list):
"""Count file number with a given directory and prefix.
Args:
directory (str): Data directory to be search.
prefix_list (list): List or prefix.
Returns:
list (int): Number list of the file with the prefix.
"""
lst = os.listdir(directory)
cnt_list = [len(fnmatch.filter(lst, x + '*')) for x in prefix_list]
return cnt_list
# check RGB
frame_dict = {}
for i, frame_dir in enumerate(frame_dirs):
total_num = count_files(frame_dir,
(rgb_prefix, flow_x_prefix, flow_y_prefix))
dir_name = locate_directory(frame_dir)
num_x = total_num[1]
num_y = total_num[2]
if num_x != num_y:
raise ValueError(f'x and y direction have different number '
f'of flow images in video directory: {frame_dir}')
if i % 200 == 0:
print(f'{i} videos parsed')
frame_dict[dir_name] = (frame_dir, total_num[0], num_x)
print('frame directory analysis done')
return frame_dict
def parse_ucf101_splits(level):
"""Parse UCF-101 dataset into "train", "val", "test" splits.
Args:
level (int): Directory level of data. 1 for the single-level directory,
2 for the two-level directory.
Returns:
list: "train", "val", "test" splits of UCF-101.
"""
class_index_file = 'data/ucf101/annotations/classInd.txt'
train_file_template = 'data/ucf101/annotations/trainlist{:02d}.txt'
test_file_template = 'data/ucf101/annotations/testlist{:02d}.txt'
with open(class_index_file, 'r') as fin:
class_index = [x.strip().split() for x in fin]
class_mapping = {x[1]: int(x[0]) - 1 for x in class_index}
def line_to_map(line):
"""A function to map line string to video and label.
Args:
line (str): A long directory path, which is a text path.
Returns:
tuple[str, str]: (video, label), video is the video id,
label is the video label.
"""
items = line.strip().split()
video = osp.splitext(items[0])[0]
if level == 1:
video = osp.basename(video)
label = items[0]
elif level == 2:
video = osp.join(
osp.basename(osp.dirname(video)), osp.basename(video))
label = class_mapping[osp.dirname(items[0])]
return video, label
splits = []
for i in range(1, 4):
with open(train_file_template.format(i), 'r') as fin:
train_list = [line_to_map(x) for x in fin]
with open(test_file_template.format(i), 'r') as fin:
test_list = [line_to_map(x) for x in fin]
splits.append((train_list, test_list))
return splits
def parse_jester_splits(level):
"""Parse Jester into "train", "val" splits.
Args:
level (int): Directory level of data. 1 for the single-level directory,
2 for the two-level directory.
Returns:
list: "train", "val", "test" splits of Jester dataset.
"""
# Read the annotations
class_index_file = 'data/jester/annotations/jester-v1-labels.csv'
train_file = 'data/jester/annotations/jester-v1-train.csv'
val_file = 'data/jester/annotations/jester-v1-validation.csv'
test_file = 'data/jester/annotations/jester-v1-test.csv'
with open(class_index_file, 'r') as fin:
class_index = [x.strip() for x in fin]
class_mapping = {class_index[idx]: idx for idx in range(len(class_index))}
def line_to_map(line, test_mode=False):
items = line.strip().split(';')
video = items[0]
if level == 1:
video = osp.basename(video)
elif level == 2:
video = osp.join(
osp.basename(osp.dirname(video)), osp.basename(video))
if test_mode:
return video
label = class_mapping[items[1]]
return video, label
with open(train_file, 'r') as fin:
train_list = [line_to_map(x) for x in fin]
with open(val_file, 'r') as fin:
val_list = [line_to_map(x) for x in fin]
with open(test_file, 'r') as fin:
test_list = [line_to_map(x, test_mode=True) for x in fin]
splits = ((train_list, val_list, test_list), )
return splits
def parse_sthv1_splits(level):
"""Parse Something-Something dataset V1 into "train", "val" splits.
Args:
level (int): Directory level of data. 1 for the single-level directory,
2 for the two-level directory.
Returns:
list: "train", "val", "test" splits of Something-Something V1 dataset.
"""
# Read the annotations
# yapf: disable
class_index_file = 'data/sthv1/annotations/something-something-v1-labels.csv' # noqa
# yapf: enable
train_file = 'data/sthv1/annotations/something-something-v1-train.csv'
val_file = 'data/sthv1/annotations/something-something-v1-validation.csv'
test_file = 'data/sthv1/annotations/something-something-v1-test.csv'
with open(class_index_file, 'r') as fin:
class_index = [x.strip() for x in fin]
class_mapping = {class_index[idx]: idx for idx in range(len(class_index))}
def line_to_map(line, test_mode=False):
items = line.strip().split(';')
video = items[0]
if level == 1:
video = osp.basename(video)
elif level == 2:
video = osp.join(
osp.basename(osp.dirname(video)), osp.basename(video))
if test_mode:
return video
label = class_mapping[items[1]]
return video, label
with open(train_file, 'r') as fin:
train_list = [line_to_map(x) for x in fin]
with open(val_file, 'r') as fin:
val_list = [line_to_map(x) for x in fin]
with open(test_file, 'r') as fin:
test_list = [line_to_map(x, test_mode=True) for x in fin]
splits = ((train_list, val_list, test_list), )
return splits
def parse_sthv2_splits(level):
"""Parse Something-Something dataset V2 into "train", "val" splits.
Args:
level (int): Directory level of data. 1 for the single-level directory,
2 for the two-level directory.
Returns:
list: "train", "val", "test" splits of Something-Something V2 dataset.
"""
# Read the annotations
# yapf: disable
class_index_file = 'data/sthv2/annotations/something-something-v2-labels.json' # noqa
# yapf: enable
train_file = 'data/sthv2/annotations/something-something-v2-train.json'
val_file = 'data/sthv2/annotations/something-something-v2-validation.json'
test_file = 'data/sthv2/annotations/something-something-v2-test.json'
with open(class_index_file, 'r') as fin:
class_mapping = json.loads(fin.read())
def line_to_map(item, test_mode=False):
video = item['id']
if level == 1:
video = osp.basename(video)
elif level == 2:
video = osp.join(
osp.basename(osp.dirname(video)), osp.basename(video))
if test_mode:
return video
template = item['template'].replace('[', '')
template = template.replace(']', '')
label = int(class_mapping[template])
return video, label
with open(train_file, 'r') as fin:
items = json.loads(fin.read())
train_list = [line_to_map(item) for item in items]
with open(val_file, 'r') as fin:
items = json.loads(fin.read())
val_list = [line_to_map(item) for item in items]
with open(test_file, 'r') as fin:
items = json.loads(fin.read())
test_list = [line_to_map(item, test_mode=True) for item in items]
splits = ((train_list, val_list, test_list), )
return splits
def parse_mmit_splits():
"""Parse Multi-Moments in Time dataset into "train", "val" splits.
Returns:
list: "train", "val", "test" splits of Multi-Moments in Time.
"""
# Read the annotations
def line_to_map(x):
video = osp.splitext(x[0])[0]
labels = [int(digit) for digit in x[1:]]
return video, labels
csv_reader = csv.reader(open('data/mmit/annotations/trainingSet.csv'))
train_list = [line_to_map(x) for x in csv_reader]
csv_reader = csv.reader(open('data/mmit/annotations/validationSet.csv'))
val_list = [line_to_map(x) for x in csv_reader]
test_list = val_list # not test for mit
splits = ((train_list, val_list, test_list), )
return splits
def parse_kinetics_splits(level, dataset):
"""Parse Kinetics dataset into "train", "val", "test" splits.
Args:
level (int): Directory level of data. 1 for the single-level directory,
2 for the two-level directory.
dataset (str): Denotes the version of Kinetics that needs to be parsed,
choices are "kinetics400", "kinetics600" and "kinetics700".
Returns:
list: "train", "val", "test" splits of Kinetics.
"""
def convert_label(s, keep_whitespaces=False):
"""Convert label name to a formal string.
Remove redundant '"' and convert whitespace to '_'.
Args:
s (str): String to be converted.
keep_whitespaces(bool): Whether to keep whitespace. Default: False.
Returns:
str: Converted string.
"""
if not keep_whitespaces:
return s.replace('"', '').replace(' ', '_')
return s.replace('"', '')
def line_to_map(x, test=False):
"""A function to map line string to video and label.
Args:
x (str): A single line from Kinetics csv file.
test (bool): Indicate whether the line comes from test
annotation file.
Returns:
tuple[str, str]: (video, label), video is the video id,
label is the video label.
"""
if test:
# video = f'{x[0]}_{int(x[1]):06d}_{int(x[2]):06d}'
video = f'{x[1]}_{int(float(x[2])):06d}_{int(float(x[3])):06d}'
label = -1 # label unknown
return video, label
video = f'{x[1]}_{int(float(x[2])):06d}_{int(float(x[3])):06d}'
if level == 2:
video = f'{convert_label(x[0])}/{video}'
else:
assert level == 1
label = class_mapping[convert_label(x[0])]
return video, label
train_file = f'data/{dataset}/annotations/kinetics_train.csv'
val_file = f'data/{dataset}/annotations/kinetics_val.csv'
test_file = f'data/{dataset}/annotations/kinetics_test.csv'
csv_reader = csv.reader(open(train_file))
# skip the first line
next(csv_reader)
labels_sorted = sorted({convert_label(row[0]) for row in csv_reader})
class_mapping = {label: i for i, label in enumerate(labels_sorted)}
csv_reader = csv.reader(open(train_file))
next(csv_reader)
train_list = [line_to_map(x) for x in csv_reader]
csv_reader = csv.reader(open(val_file))
next(csv_reader)
val_list = [line_to_map(x) for x in csv_reader]
csv_reader = csv.reader(open(test_file))
next(csv_reader)
test_list = [line_to_map(x, test=True) for x in csv_reader]
splits = ((train_list, val_list, test_list), )
return splits
def parse_mit_splits():
"""Parse Moments in Time dataset into "train", "val" splits.
Returns:
list: "train", "val", "test" splits of Moments in Time.
"""
# Read the annotations
class_mapping = {}
with open('data/mit/annotations/moments_categories.txt') as f_cat:
for line in f_cat.readlines():
cat, digit = line.rstrip().split(',')
class_mapping[cat] = int(digit)
def line_to_map(x):
video = osp.splitext(x[0])[0]
label = class_mapping[osp.dirname(x[0])]
return video, label
csv_reader = csv.reader(open('data/mit/annotations/trainingSet.csv'))
train_list = [line_to_map(x) for x in csv_reader]
csv_reader = csv.reader(open('data/mit/annotations/validationSet.csv'))
val_list = [line_to_map(x) for x in csv_reader]
test_list = val_list # no test for mit
splits = ((train_list, val_list, test_list), )
return splits
def parse_hmdb51_split(level):
train_file_template = 'data/hmdb51/annotations/trainlist{:02d}.txt'
test_file_template = 'data/hmdb51/annotations/testlist{:02d}.txt'
class_index_file = 'data/hmdb51/annotations/classInd.txt'
def generate_class_index_file():
"""This function will generate a `ClassInd.txt` for HMDB51 in a format
like UCF101, where class id starts with 1."""
video_path = 'data/hmdb51/videos'
annotation_dir = 'data/hmdb51/annotations'
class_list = sorted(os.listdir(video_path))
class_dict = dict()
if not osp.exists(class_index_file):
with open(class_index_file, 'w') as f:
content = []
for class_id, class_name in enumerate(class_list):
# like `ClassInd.txt` in UCF-101,
# the class_id begins with 1
class_dict[class_name] = class_id + 1
cur_line = ' '.join([str(class_id + 1), class_name])
content.append(cur_line)
content = '\n'.join(content)
f.write(content)
else:
print(f'{class_index_file} has been generated before.')
class_dict = {
class_name: class_id + 1
for class_id, class_name in enumerate(class_list)
}
for i in range(1, 4):
train_content = []
test_content = []
for class_name in class_dict:
filename = class_name + f'_test_split{i}.txt'
filename_path = osp.join(annotation_dir, filename)
with open(filename_path, 'r') as fin:
for line in fin:
video_info = line.strip().split()
video_name = video_info[0]
if video_info[1] == '1':
target_line = ' '.join([
osp.join(class_name, video_name),
str(class_dict[class_name])
])
train_content.append(target_line)
elif video_info[1] == '2':
target_line = ' '.join([
osp.join(class_name, video_name),
str(class_dict[class_name])
])
test_content.append(target_line)
train_content = '\n'.join(train_content)
test_content = '\n'.join(test_content)
with open(train_file_template.format(i), 'w') as fout:
fout.write(train_content)
with open(test_file_template.format(i), 'w') as fout:
fout.write(test_content)
generate_class_index_file()
with open(class_index_file, 'r') as fin:
class_index = [x.strip().split() for x in fin]
class_mapping = {x[1]: int(x[0]) - 1 for x in class_index}
def line_to_map(line):
items = line.strip().split()
video = osp.splitext(items[0])[0]
if level == 1:
video = osp.basename(video)
elif level == 2:
video = osp.join(
osp.basename(osp.dirname(video)), osp.basename(video))
label = class_mapping[osp.dirname(items[0])]
return video, label
splits = []
for i in range(1, 4):
with open(train_file_template.format(i), 'r') as fin:
train_list = [line_to_map(x) for x in fin]
with open(test_file_template.format(i), 'r') as fin:
test_list = [line_to_map(x) for x in fin]
splits.append((train_list, test_list))
return splits
def parse_diving48_splits():
train_file = 'data/diving48/annotations/Diving48_V2_train.json'
test_file = 'data/diving48/annotations/Diving48_V2_test.json'
train = json.load(open(train_file))
test = json.load(open(test_file))
# class_index_file = 'data/diving48/annotations/Diving48_vocab.json'
# class_list = json.load(open(class_index_file))
train_list = []
test_list = []
for item in train:
vid_name = item['vid_name']
label = item['label']
train_list.append((vid_name, label))
for item in test:
vid_name = item['vid_name']
label = item['label']
test_list.append((vid_name, label))
splits = ((train_list, test_list), )
return splits