# Copyright (c) OpenMMLab. All rights reserved.
import os.path as osp
import numpy as np
from .proposal_utils import temporal_iop, temporal_iou
def generate_candidate_proposals(video_list,
video_infos,
tem_results_dir,
temporal_scale,
peak_threshold,
tem_results_ext='.csv',
result_dict=None):
"""Generate Candidate Proposals with given temporal evaluation results.
Each proposal file will contain:
'tmin,tmax,tmin_score,tmax_score,score,match_iou,match_ioa'.
Args:
video_list (list[int]): List of video indexes to generate proposals.
video_infos (list[dict]): List of video_info dict that contains
'video_name', 'duration_frame', 'duration_second',
'feature_frame', and 'annotations'.
tem_results_dir (str): Directory to load temporal evaluation
results.
temporal_scale (int): The number (scale) on temporal axis.
peak_threshold (float): The threshold for proposal generation.
tem_results_ext (str): File extension for temporal evaluation
model output. Default: '.csv'.
result_dict (dict | None): The dict to save the results. Default: None.
Returns:
dict: A dict contains video_name as keys and proposal list as value.
If result_dict is not None, save the results to it.
"""
if tem_results_ext != '.csv':
raise NotImplementedError('Only support csv format now.')
tscale = temporal_scale
tgap = 1. / tscale
proposal_dict = {}
for video_index in video_list:
video_name = video_infos[video_index]['video_name']
tem_path = osp.join(tem_results_dir, video_name + tem_results_ext)
tem_results = np.loadtxt(
tem_path, dtype=np.float32, delimiter=',', skiprows=1)
start_scores = tem_results[:, 1]
end_scores = tem_results[:, 2]
max_start = max(start_scores)
max_end = max(end_scores)
start_bins = np.zeros(len(start_scores))
start_bins[[0, -1]] = 1
end_bins = np.zeros(len(end_scores))
end_bins[[0, -1]] = 1
for idx in range(1, tscale - 1):
if start_scores[idx] > start_scores[
idx + 1] and start_scores[idx] > start_scores[idx - 1]:
start_bins[idx] = 1
elif start_scores[idx] > (peak_threshold * max_start):
start_bins[idx] = 1
if end_scores[idx] > end_scores[
idx + 1] and end_scores[idx] > end_scores[idx - 1]:
end_bins[idx] = 1
elif end_scores[idx] > (peak_threshold * max_end):
end_bins[idx] = 1
tmin_list = []
tmin_score_list = []
tmax_list = []
tmax_score_list = []
for idx in range(tscale):
if start_bins[idx] == 1:
tmin_list.append(tgap / 2 + tgap * idx)
tmin_score_list.append(start_scores[idx])
if end_bins[idx] == 1:
tmax_list.append(tgap / 2 + tgap * idx)
tmax_score_list.append(end_scores[idx])
new_props = []
for tmax, tmax_score in zip(tmax_list, tmax_score_list):
for tmin, tmin_score in zip(tmin_list, tmin_score_list):
if tmin >= tmax:
break
new_props.append([tmin, tmax, tmin_score, tmax_score])
new_props = np.stack(new_props)
score = (new_props[:, 2] * new_props[:, 3]).reshape(-1, 1)
new_props = np.concatenate((new_props, score), axis=1)
new_props = new_props[new_props[:, -1].argsort()[::-1]]
video_info = video_infos[video_index]
video_frame = video_info['duration_frame']
video_second = video_info['duration_second']
feature_frame = video_info['feature_frame']
corrected_second = float(feature_frame) / video_frame * video_second
gt_tmins = []
gt_tmaxs = []
for annotations in video_info['annotations']:
gt_tmins.append(annotations['segment'][0] / corrected_second)
gt_tmaxs.append(annotations['segment'][1] / corrected_second)
new_iou_list = []
new_ioa_list = []
for new_prop in new_props:
new_iou = max(
temporal_iou(new_prop[0], new_prop[1], gt_tmins, gt_tmaxs))
new_ioa = max(
temporal_iop(new_prop[0], new_prop[1], gt_tmins, gt_tmaxs))
new_iou_list.append(new_iou)
new_ioa_list.append(new_ioa)
new_iou_list = np.array(new_iou_list).reshape(-1, 1)
new_ioa_list = np.array(new_ioa_list).reshape(-1, 1)
new_props = np.concatenate((new_props, new_iou_list), axis=1)
new_props = np.concatenate((new_props, new_ioa_list), axis=1)
proposal_dict[video_name] = new_props
if result_dict is not None:
result_dict[video_name] = new_props
return proposal_dict
def generate_bsp_feature(video_list,
video_infos,
tem_results_dir,
pgm_proposals_dir,
top_k=1000,
bsp_boundary_ratio=0.2,
num_sample_start=8,
num_sample_end=8,
num_sample_action=16,
num_sample_interp=3,
tem_results_ext='.csv',
pgm_proposal_ext='.csv',
result_dict=None):
"""Generate Boundary-Sensitive Proposal Feature with given proposals.
Args:
video_list (list[int]): List of video indexes to generate bsp_feature.
video_infos (list[dict]): List of video_info dict that contains
'video_name'.
tem_results_dir (str): Directory to load temporal evaluation
results.
pgm_proposals_dir (str): Directory to load proposals.
top_k (int): Number of proposals to be considered. Default: 1000
bsp_boundary_ratio (float): Ratio for proposal boundary
(start/end). Default: 0.2.
num_sample_start (int): Num of samples for actionness in
start region. Default: 8.
num_sample_end (int): Num of samples for actionness in end region.
Default: 8.
num_sample_action (int): Num of samples for actionness in center
region. Default: 16.
num_sample_interp (int): Num of samples for interpolation for
each sample point. Default: 3.
tem_results_ext (str): File extension for temporal evaluation
model output. Default: '.csv'.
pgm_proposal_ext (str): File extension for proposals. Default: '.csv'.
result_dict (dict | None): The dict to save the results. Default: None.
Returns:
bsp_feature_dict (dict): A dict contains video_name as keys and
bsp_feature as value. If result_dict is not None, save the
results to it.
"""
if tem_results_ext != '.csv' or pgm_proposal_ext != '.csv':
raise NotImplementedError('Only support csv format now.')
bsp_feature_dict = {}
for video_index in video_list:
video_name = video_infos[video_index]['video_name']
# Load temporal evaluation results
tem_path = osp.join(tem_results_dir, video_name + tem_results_ext)
tem_results = np.loadtxt(
tem_path, dtype=np.float32, delimiter=',', skiprows=1)
score_action = tem_results[:, 0]
seg_tmins = tem_results[:, 3]
seg_tmaxs = tem_results[:, 4]
video_scale = len(tem_results)
video_gap = seg_tmaxs[0] - seg_tmins[0]
video_extend = int(video_scale / 4 + 10)
# Load proposals results
proposal_path = osp.join(pgm_proposals_dir,
video_name + pgm_proposal_ext)
pgm_proposals = np.loadtxt(
proposal_path, dtype=np.float32, delimiter=',', skiprows=1)
pgm_proposals = pgm_proposals[:top_k]
# Generate temporal sample points
boundary_zeros = np.zeros([video_extend])
score_action = np.concatenate(
(boundary_zeros, score_action, boundary_zeros))
begin_tp = []
middle_tp = []
end_tp = []
for i in range(video_extend):
begin_tp.append(-video_gap / 2 -
(video_extend - 1 - i) * video_gap)
end_tp.append(video_gap / 2 + seg_tmaxs[-1] + i * video_gap)
for i in range(video_scale):
middle_tp.append(video_gap / 2 + i * video_gap)
t_points = begin_tp + middle_tp + end_tp
bsp_feature = []
for pgm_proposal in pgm_proposals:
tmin = pgm_proposal[0]
tmax = pgm_proposal[1]
tlen = tmax - tmin
# Temporal range for start
tmin_0 = tmin - tlen * bsp_boundary_ratio
tmin_1 = tmin + tlen * bsp_boundary_ratio
# Temporal range for end
tmax_0 = tmax - tlen * bsp_boundary_ratio
tmax_1 = tmax + tlen * bsp_boundary_ratio
# Generate features at start boundary
tlen_start = (tmin_1 - tmin_0) / (num_sample_start - 1)
tlen_start_sample = tlen_start / num_sample_interp
t_new = [
tmin_0 - tlen_start / 2 + tlen_start_sample * i
for i in range(num_sample_start * num_sample_interp + 1)
]
y_new_start_action = np.interp(t_new, t_points, score_action)
y_new_start = [
np.mean(y_new_start_action[i * num_sample_interp:(i + 1) *
num_sample_interp + 1])
for i in range(num_sample_start)
]
# Generate features at end boundary
tlen_end = (tmax_1 - tmax_0) / (num_sample_end - 1)
tlen_end_sample = tlen_end / num_sample_interp
t_new = [
tmax_0 - tlen_end / 2 + tlen_end_sample * i
for i in range(num_sample_end * num_sample_interp + 1)
]
y_new_end_action = np.interp(t_new, t_points, score_action)
y_new_end = [
np.mean(y_new_end_action[i * num_sample_interp:(i + 1) *
num_sample_interp + 1])
for i in range(num_sample_end)
]
# Generate features for action
tlen_action = (tmax - tmin) / (num_sample_action - 1)
tlen_action_sample = tlen_action / num_sample_interp
t_new = [
tmin - tlen_action / 2 + tlen_action_sample * i
for i in range(num_sample_action * num_sample_interp + 1)
]
y_new_action = np.interp(t_new, t_points, score_action)
y_new_action = [
np.mean(y_new_action[i * num_sample_interp:(i + 1) *
num_sample_interp + 1])
for i in range(num_sample_action)
]
feature = np.concatenate([y_new_action, y_new_start, y_new_end])
bsp_feature.append(feature)
bsp_feature = np.array(bsp_feature)
bsp_feature_dict[video_name] = bsp_feature
if result_dict is not None:
result_dict[video_name] = bsp_feature
return bsp_feature_dict