import matplotlib.pyplot as plt
import open3d as o3d
# This source is based on https://github.com/AbnerHqC/GaitSet/blob/master/pretreatment.py
import argparse
import logging
import multiprocessing as mp
import os
import pickle
from collections import defaultdict
from functools import partial
from pathlib import Path
from typing import Tuple
import cv2
import numpy as np
from tqdm import tqdm
def align_img(img: np.ndarray, img_size: int = 64) -> np.ndarray:
"""Aligns the image to the center.
Args:
img (np.ndarray): Image to align.
img_size (int, optional): Image resizing size. Defaults to 64.
Returns:
np.ndarray: Aligned image.
"""
if img.sum() <= 10000:
y_top = 0
y_btm = img.shape[0]
else:
# Get the upper and lower points
# img.sum
y_sum = img.sum(axis=2).sum(axis=1)
y_top = (y_sum != 0).argmax(axis=0)
y_btm = (y_sum != 0).cumsum(axis=0).argmax(axis=0)
img = img[y_top: y_btm, :,:]
# As the height of a person is larger than the width,
# use the height to calculate resize ratio.
ratio = img.shape[1] / img.shape[0]
img = cv2.resize(img, (int(img_size * ratio), img_size), interpolation=cv2.INTER_CUBIC)
# Get the median of the x-axis and take it as the person's x-center.
x_csum = img.sum(axis=2).sum(axis=0).cumsum()
x_center = img.shape[1] // 2
for idx, csum in enumerate(x_csum):
if csum > img.sum() / 2:
x_center = idx
break
# if not x_center:
# logging.warning(f'{img_file} has no center.')
# continue
# Get the left and right points
half_width = img_size // 2
left = x_center - half_width
right = x_center + half_width
if left <= 0 or right >= img.shape[1]:
left += half_width
right += half_width
# _ = np.zeros((img.shape[0], half_width,3))
# img = np.concatenate([_, img, _], axis=1)
img = img[:, left: right,:].astype('uint8')
return img
def lidar_to_2d_front_view(points,
v_res,
h_res,
v_fov,
val="depth",
cmap="jet",
saveto=None,
y_fudge=0.0
):
""" Takes points in 3D space from LIDAR data and projects them to a 2D
"front view" image, and saves that image.
Args:
points: (np array)
The numpy array containing the lidar points.
The shape should be Nx4
- Where N is the number of points, and
- each point is specified by 4 values (x, y, z, reflectance)
v_res: (float)
vertical resolution of the lidar sensor used.
h_res: (float)
horizontal resolution of the lidar sensor used.
v_fov: (tuple of two floats)
(minimum_negative_angle, max_positive_angle)
val: (str)
What value to use to encode the points that get plotted.
One of {"depth", "height", "reflectance"}
cmap: (str)
Color map to use to color code the `val` values.
NOTE: Must be a value accepted by matplotlib's scatter function
Examples: "jet", "gray"
saveto: (str or None)
If a string is provided, it saves the image as this filename.
If None, then it just shows the image.
y_fudge: (float)
A hacky fudge factor to use if the theoretical calculations of
vertical range do not match the actual data.
For a Velodyne HDL 64E, set this value to 5.
"""
# DUMMY PROOFING
assert len(v_fov) ==2, "v_fov must be list/tuple of length 2"
assert v_fov[0] <= 0, "first element in v_fov must be 0 or negative"
assert val in {"depth", "height", "reflectance"}, \
'val must be one of {"depth", "height", "reflectance"}'
x_lidar = - points[:, 0]
y_lidar = - points[:, 1]
z_lidar = points[:, 2]
# Distance relative to origin when looked from top
d_lidar = np.sqrt(x_lidar ** 2 + y_lidar ** 2)
# Absolute distance relative to origin
# d_lidar = np.sqrt(x_lidar ** 2 + y_lidar ** 2, z_lidar ** 2)
v_fov_total = -v_fov[0] + v_fov[1]
# Convert to Radians
v_res_rad = v_res * (np.pi/180)
h_res_rad = h_res * (np.pi/180)
# PROJECT INTO IMAGE COORDINATES
x_img = np.arctan2(-y_lidar, x_lidar)/ h_res_rad
y_img = np.arctan2(z_lidar, d_lidar)/ v_res_rad
# SHIFT COORDINATES TO MAKE 0,0 THE MINIMUM
x_min = -360.0 / h_res / 2 # Theoretical min x value based on sensor specs
x_img -= x_min # Shift
x_max = 360.0 / h_res # Theoretical max x value after shifting
y_min = v_fov[0] / v_res # theoretical min y value based on sensor specs
y_img -= y_min # Shift
y_max = v_fov_total / v_res # Theoretical max x value after shifting
y_max += y_fudge # Fudge factor if the calculations based on
# spec sheet do not match the range of
# angles collected by in the data.
# WHAT DATA TO USE TO ENCODE THE VALUE FOR EACH PIXEL
if val == "reflectance":
pass
elif val == "height":
pixel_values = z_lidar
else:
pixel_values = -d_lidar
# pixel_values = 'w'
# PLOT THE IMAGE
cmap = "jet" # Color map to use
dpi = 100 # Image resolution
fig, ax = plt.subplots(figsize=(x_max/dpi, y_max/dpi), dpi=dpi)
ax.scatter(x_img,y_img, s=1, c=pixel_values, linewidths=0, alpha=1, cmap=cmap)
ax.set_facecolor((0, 0, 0)) # Set regions with no points to black
ax.axis('scaled') # {equal, scaled}
ax.xaxis.set_visible(False) # Do not draw axis tick marks
ax.yaxis.set_visible(False) # Do not draw axis tick marks
plt.xlim([0, x_max]) # prevent drawing empty space outside of horizontal FOV
plt.ylim([0, y_max]) # prevent drawing empty space outside of vertical FOV
saveto = saveto.replace('.pcd','.png')
fig.savefig(saveto, dpi=dpi, bbox_inches='tight', pad_inches=0.0)
plt.close()
img = cv2.imread(saveto)
img = align_img(img)
aligned_path = saveto.replace('offline','aligned')
os.makedirs(os.path.dirname(aligned_path), exist_ok=True)
cv2.imwrite(aligned_path, img)
# fig, ax = plt.subplots(figsize=(x_max/dpi, y_max/dpi), dpi=dpi)
# ax.scatter(x_img,y_img, s=1, c='white', linewidths=0, alpha=1)
# ax.set_facecolor((0, 0, 0)) # Set regions with no points to black
# ax.axis('scaled') # {equal, scaled}
# ax.xaxis.set_visible(False) # Do not draw axis tick marks
# ax.yaxis.set_visible(False) # Do not draw axis tick marks
# plt.xlim([0, x_max]) # prevent drawing empty space outside of horizontal FOV
# plt.ylim([0, y_max]) # prevent drawing empty space outside of vertical FOV
# fig.savefig(saveto.replace('depth','sils'), dpi=dpi, bbox_inches='tight', pad_inches=0.0)
# plt.close()
def pcd2depth(img_groups: Tuple, output_path: Path, img_size: int = 64, verbose: bool = False, dataset='CASIAB') -> None:
"""Reads a group of images and saves the data in pickle format.
Args:
img_groups (Tuple): Tuple of (sid, seq, view) and list of image paths.
output_path (Path): Output path.
img_size (int, optional): Image resizing size. Defaults to 64.
verbose (bool, optional): Display debug info. Defaults to False.
"""
sinfo = img_groups[0]
img_paths = img_groups[1]
for img_file in sorted(img_paths):
pcd_name = img_file.split('/')[-1]
pcd = o3d.io.read_point_cloud(img_file)
points = np.asarray(pcd.points)
HRES = 0.19188 # horizontal resolution (assuming 20Hz setting)
VRES = 0.2
VFOV = (-25.0, 15.0) # Field of view (-ve, +ve) along vertical axis
Y_FUDGE = 0 # y fudge factor for velodyne HDL 64E
dst_path = os.path.join(output_path, *sinfo)
os.makedirs(dst_path, exist_ok=True)
dst_path = os.path.join(dst_path,pcd_name)
lidar_to_2d_front_view(points, v_res=VRES, h_res=HRES, v_fov=VFOV, val="depth",
saveto=dst_path, y_fudge=Y_FUDGE)
# if len(points) == 0:
# print(img_file)
# to_pickle.append(points)
# dst_path = os.path.join(output_path, *sinfo)
# os.makedirs(dst_path, exist_ok=True)
# pkl_path = os.path.join(dst_path, f'pcd-{sinfo[2]}.pkl')
# pickle.dump(to_pickle, open(pkl_path, 'wb'))
# if len(to_pickle) < 5:
# logging.warning(f'{sinfo} has less than 5 valid data.')
def pretreat(input_path: Path, output_path: Path, img_size: int = 64, workers: int = 4, verbose: bool = False, dataset: str = 'CASIAB') -> None:
"""Reads a dataset and saves the data in pickle format.
Args:
input_path (Path): Dataset root path.
output_path (Path): Output path.
img_size (int, optional): Image resizing size. Defaults to 64.
workers (int, optional): Number of thread workers. Defaults to 4.
verbose (bool, optional): Display debug info. Defaults to False.
"""
img_groups = defaultdict(list)
logging.info(f'Listing {input_path}')
total_files = 0
for sid in tqdm(sorted(os.listdir(input_path))):
for seq in os.listdir(os.path.join(input_path,sid)):
for view in os.listdir(os.path.join(input_path,sid,seq)):
for img_path in os.listdir(os.path.join(input_path,sid,seq,view,'PCDs')):
img_groups[(sid, seq, view,'PCDs_offline_depths')].append(os.path.join(input_path,sid,seq,view, 'PCDs',img_path))
total_files += 1
logging.info(f'Total files listed: {total_files}')
progress = tqdm(total=len(img_groups), desc='Pretreating', unit='folder')
with mp.Pool(workers) as pool:
logging.info(f'Start pretreating {input_path}')
for _ in pool.imap_unordered(partial(pcd2depth, output_path=output_path, img_size=img_size, verbose=verbose, dataset=dataset), img_groups.items()):
progress.update(1)
logging.info('Done')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='OpenGait dataset pretreatment module.')
parser.add_argument('-i', '--input_path', default='', type=str, help='Root path of raw dataset.')
parser.add_argument('-o', '--output_path', default='', type=str, help='Output path of pickled dataset.')
parser.add_argument('-l', '--log_file', default='./pretreatment.log', type=str, help='Log file path. Default: ./pretreatment.log')
parser.add_argument('-n', '--n_workers', default=4, type=int, help='Number of thread workers. Default: 4')
parser.add_argument('-r', '--img_size', default=64, type=int, help='Image resizing size. Default 64')
parser.add_argument('-d', '--dataset', default='CASIAB', type=str, help='Dataset for pretreatment.')
parser.add_argument('-v', '--verbose', default=False, action='store_true', help='Display debug info.')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, filename=args.log_file, filemode='w', format='[%(asctime)s - %(levelname)s]: %(message)s')
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
logging.info('Verbose mode is on.')
for k, v in args.__dict__.items():
logging.debug(f'{k}: {v}')
pretreat(input_path=Path(args.input_path), output_path=Path(args.output_path), img_size=args.img_size, workers=args.n_workers, verbose=args.verbose, dataset=args.dataset)