[225570]: / vis / inv_pendulum.py

Download this file

348 lines (282 with data), 11.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
from .visual import CocoPart
import numpy as np
from helpers import *
from default_params import *
def match_ip(ip_set, new_ips, lstm_set, num_matched, consecutive_frames=DEFAULT_CONSEC_FRAMES):
len_ip_set = len(ip_set)
added = [False for _ in range(len_ip_set)]
new_len_ip_set = len_ip_set
for new_ip in new_ips:
if not is_valid(new_ip):
continue
# assert valid_candidate_hist(new_ip)
cmin = [MIN_THRESH, -1]
for i in range(len_ip_set):
if not added[i] and dist(last_ip(ip_set[i])[0], new_ip) < cmin[0]:
# here add dome condition that last_ip(ip_set[0] >-5 or someting)
cmin[0] = dist(last_ip(ip_set[i])[0], new_ip)
cmin[1] = i
if cmin[1] == -1:
ip_set.append([None for _ in range(consecutive_frames - 1)] + [new_ip])
lstm_set.append([None, 0, 0, 0]) # Initial hidden state of lstm is None
new_len_ip_set += 1
else:
added[cmin[1]] = True
pop_and_add(ip_set[cmin[1]], new_ip, consecutive_frames)
new_matched = num_matched
removed_indx = []
removed_match = []
for i in range(len(added)):
if not added[i]:
pop_and_add(ip_set[i], None, consecutive_frames)
if ip_set[i] == [None for _ in range(consecutive_frames)]:
if i < num_matched:
new_matched -= 1
removed_match.append(i)
new_len_ip_set -= 1
removed_indx.append(i)
for i in sorted(removed_indx, reverse=True):
ip_set.pop(i)
lstm_set.pop()
return new_matched, new_len_ip_set, removed_match
def extend_vector(p1, p2, l):
p1 += (p1-p2)*l/(2*np.linalg.norm((p1-p2), 2))
p2 -= (p1-p2)*l/(2*np.linalg.norm((p1-p2), 2))
return p1, p2
def perp(a):
b = np.empty_like(a)
b[0] = -a[1]
b[1] = a[0]
return b
# line segment a given by endpoints a1, a2
# line segment b given by endpoints b1, b2
# return
def seg_intersect(a1, a2, b1, b2):
da = a2-a1
db = b2-b1
dp = a1-b1
dap = perp(da)
denom = np.dot(dap, db)
num = np.dot(dap, dp)
return (num / denom.astype(float))*db + b1
def get_kp(kp):
threshold1 = 5e-3
# dict of np arrays of coordinates
inv_pend = {}
# print(type(kp[CocoPart.LEar]))
numx = (kp[CocoPart.LEar][2]*kp[CocoPart.LEar][0] + kp[CocoPart.LEye][2]*kp[CocoPart.LEye][0] +
kp[CocoPart.REye][2]*kp[CocoPart.REye][0] + kp[CocoPart.REar][2]*kp[CocoPart.REar][0])
numy = (kp[CocoPart.LEar][2]*kp[CocoPart.LEar][1] + kp[CocoPart.LEye][2]*kp[CocoPart.LEye][1] +
kp[CocoPart.REye][2]*kp[CocoPart.REye][1] + kp[CocoPart.REar][2]*kp[CocoPart.REar][1])
den = kp[CocoPart.LEar][2] + kp[CocoPart.LEye][2] + kp[CocoPart.REye][2] + kp[CocoPart.REar][2]
if den < HEAD_THRESHOLD:
inv_pend['H'] = None
else:
inv_pend['H'] = np.array([numx/den, numy/den])
if all([kp[CocoPart.LShoulder], kp[CocoPart.RShoulder],
kp[CocoPart.LShoulder][2] > threshold1, kp[CocoPart.RShoulder][2] > threshold1]):
inv_pend['N'] = np.array([(kp[CocoPart.LShoulder][0]+kp[CocoPart.RShoulder][0])/2,
(kp[CocoPart.LShoulder][1]+kp[CocoPart.RShoulder][1])/2])
else:
inv_pend['N'] = None
if all([kp[CocoPart.LHip], kp[CocoPart.RHip],
kp[CocoPart.LHip][2] > threshold1, kp[CocoPart.RHip][2] > threshold1]):
inv_pend['B'] = np.array([(kp[CocoPart.LHip][0]+kp[CocoPart.RHip][0])/2,
(kp[CocoPart.LHip][1]+kp[CocoPart.RHip][1])/2])
else:
inv_pend['B'] = None
if kp[CocoPart.LKnee] is not None and kp[CocoPart.LKnee][2] > threshold1:
inv_pend['KL'] = np.array([kp[CocoPart.LKnee][0], kp[CocoPart.LKnee][1]])
else:
inv_pend['KL'] = None
if kp[CocoPart.RKnee] is not None and kp[CocoPart.RKnee][2] > threshold1:
inv_pend['KR'] = np.array([kp[CocoPart.RKnee][0], kp[CocoPart.RKnee][1]])
else:
inv_pend['KR'] = None
if inv_pend['B'] is not None:
if inv_pend['N'] is not None:
height = np.linalg.norm(inv_pend['N'] - inv_pend['B'], 2)
LS, RS = extend_vector(np.asarray(kp[CocoPart.LShoulder][:2]),
np.asarray(kp[CocoPart.RShoulder][:2]), height/4)
LB, RB = extend_vector(np.asarray(kp[CocoPart.LHip][:2]),
np.asarray(kp[CocoPart.RHip][:2]), height/3)
ubbox = (LS, RS, RB, LB)
if inv_pend['KL'] is not None and inv_pend['KR'] is not None:
lbbox = (LB, RB, inv_pend['KR'], inv_pend['KL'])
else:
lbbox = ([0, 0], [0, 0])
#lbbox = None
else:
ubbox = ([0, 0], [0, 0])
#ubbox = None
if inv_pend['KL'] is not None and inv_pend['KR'] is not None:
lbbox = (np.array(kp[CocoPart.LHip][:2]), np.array(kp[CocoPart.RHip][:2]),
inv_pend['KR'], inv_pend['KL'])
else:
lbbox = ([0, 0], [0, 0])
#lbbox = None
else:
ubbox = ([0, 0], [0, 0])
lbbox = ([0, 0], [0, 0])
#ubbox = None
#lbbox = None
# condition = (inv_pend["H"] is None) and (inv_pend['N'] is not None and inv_pend['B'] is not None)
# if condition:
# print("half disp")
return inv_pend, ubbox, lbbox
def get_angle(v0, v1):
return np.math.atan2(np.linalg.det([v0, v1]), np.dot(v0, v1))
def is_valid(ip):
assert ip is not None
ip = ip["keypoints"]
return (ip['B'] is not None and ip['N'] is not None and ip['H'] is not None)
def get_rot_energy(ip0, ip1):
t = ip1["time"] - ip0["time"]
ip0 = ip0["keypoints"]
ip1 = ip1["keypoints"]
m1 = 1
m2 = 5
m3 = 5
energy = 0
den = 0
N1 = ip1['N'] - ip1['B']
N0 = ip0['N'] - ip0['B']
d2sq = N1.dot(N1)
w2sq = (get_angle(N0, N1)/t)**2
energy += m2*d2sq*w2sq
den += m2*d2sq
H1 = ip1['H'] - ip1['B']
H0 = ip0['H'] - ip0['B']
d1sq = H1.dot(H1)
w1sq = (get_angle(H0, H1)/t)**2
energy += m1*d1sq*w1sq
den += m1*d1sq
energy = energy/(2*den)
# energy = energy/2
return energy
def get_angle_vertical(v):
return np.math.atan2(-v[0], -v[1])
def get_gf(ip0, ip1, ip2):
t1 = ip1["time"] - ip0["time"]
t2 = ip2["time"] - ip1["time"]
ip0 = ip0["keypoints"]
ip1 = ip1["keypoints"]
ip2 = ip2["keypoints"]
m1 = 1
m2 = 15
g = 10
H2 = ip2['H'] - ip2['N']
H1 = ip1['H'] - ip1['N']
H0 = ip0['H'] - ip0['N']
d1 = np.sqrt(H1.dot(H1))
theta_1_plus_2_2 = get_angle_vertical(H2)
theta_1_plus_2_1 = get_angle_vertical(H1)
theta_1_plus_2_0 = get_angle_vertical(H0)
# print("H: ",H0,H1,H2)
N2 = ip2['N'] - ip2['B']
N1 = ip1['N'] - ip1['B']
N0 = ip0['N'] - ip0['B']
d2 = np.sqrt(N1.dot(N1))
# print("N: ",N0,N1,N2)
theta_2_2 = get_angle_vertical(N2)
theta_2_1 = get_angle_vertical(N1)
theta_2_0 = get_angle_vertical(N0)
#print("theta_2_2:",theta_2_2,"theta_2_1:",theta_2_1,"theta_2_0:",theta_2_0,sep=", ")
theta_1_0 = theta_1_plus_2_0 - theta_2_0
theta_1_1 = theta_1_plus_2_1 - theta_2_1
theta_1_2 = theta_1_plus_2_2 - theta_2_2
# print("theta1: ",theta_1_0,theta_1_1,theta_1_2)
# print("theta2: ",theta_2_0,theta_2_1,theta_2_2)
theta2 = theta_2_1
theta1 = theta_1_1
del_theta1_0 = (get_angle(H0, H1))/t1
del_theta1_1 = (get_angle(H1, H2))/t2
del_theta2_0 = (get_angle(N0, N1))/t1
del_theta2_1 = (get_angle(N1, N2))/t2
# print("del_theta2_1:",del_theta2_1,"del_theta2_0:",del_theta2_0,sep=",")
del_theta1 = 0.5 * (del_theta1_1 + del_theta1_0)
del_theta2 = 0.5 * (del_theta2_1 + del_theta2_0)
doubledel_theta1 = (del_theta1_1 - del_theta1_0) / 0.5*(t1 + t2)
doubledel_theta2 = (del_theta2_1 - del_theta2_0) / 0.5*(t1 + t2)
# print("doubledel_theta2:",doubledel_theta2)
d1 = d1/d2
d2 = 1
# print("del_theta",del_theta1,del_theta2)
# print("doubledel_theta",doubledel_theta1,doubledel_theta2)
Q_RD1 = 0
Q_RD1 += m1 * d1 * doubledel_theta1 * doubledel_theta1
Q_RD1 += (m1*d1*d1 + m1*d1*d2*np.cos(theta1))*doubledel_theta2
Q_RD1 += m1*d1*d2*np.sin(theta1)*del_theta2*del_theta2
Q_RD1 -= m1*g*d2*np.sin(theta1+theta2)
Q_RD2 = 0
Q_RD2 += (m1*d1*d1 + m1*d1*d2*np.cos(theta1))*doubledel_theta1
Q_RD2 += ((m1+m2)*d2*d2 + m1*d1*d1 + 2*m1*d1*d2*np.cos(theta1))*doubledel_theta2
Q_RD2 -= 2*m1*d1*d2*np.sin(theta1)*del_theta2*del_theta1 + m1*d1 * \
d2*np.sin(theta1)*del_theta1*del_theta1
Q_RD2 -= (m1 + m2)*g*d2*np.sin(theta2) + m1*g*d1*np.sin(theta1 + theta2)
# print("Energy: ", Q_RD1 + Q_RD2)
return Q_RD1 + Q_RD2
def get_height_bbox(ip):
bbox = ip["box"]
assert isinstance(bbox, np.ndarray)
diff_box = bbox[1] - bbox[0]
return diff_box[1]
def get_ratio_bbox(ip):
bbox = ip["box"]
assert isinstance(bbox, np.ndarray)
diff_box = bbox[1] - bbox[0]
if diff_box[1] == 0:
diff_box[1] += 1e5*diff_box[0]
assert(np.any(diff_box > 0))
ratio = diff_box[0]/diff_box[1]
return ratio
def get_ratio_derivative(ip0, ip1):
ratio_der = None
time = ip1["time"] - ip0["time"]
diff_box = ip1["features"]["ratio_bbox"] - ip0["features"]["ratio_bbox"]
assert time != 0
ratio_der = diff_box/time
return ratio_der
def match_ip2(matched_ip_set, unmatched_ip_set, new_ips, re_matrix, gf_matrix, consecutive_frames=DEFAULT_CONSEC_FRAMES):
len_matched_ip_set = len(matched_ip_set)
added_matched = [False for _ in range(len_matched_ip_set)]
len_unmatched_ip_set = len(unmatched_ip_set)
added_unmatched = [False for _ in range(len_unmatched_ip_set)]
for new_ip in new_ips:
if not is_valid(new_ip):
continue
cmin = [MIN_THRESH, -1]
connected_set = None
connected_added = None
for i in range(len_matched_ip_set):
if not added_matched[i] and dist(last_ip(matched_ip_set[i])[0], new_ip) < cmin[0]:
# here add dome condition that last_ip(ip_set[0] >-5 or someting)
cmin[0] = dist(last_ip(matched_ip_set[i])[0], new_ip)
cmin[1] = i
connected_set = matched_ip_set
connected_added = added_matched
for i in range(len_unmatched_ip_set):
if not added_unmatched[i] and dist(last_ip(unmatched_ip_set[i])[0], new_ip) < cmin[0]:
# here add dome condition that last_ip(ip_set[0] >-5 or someting)
cmin[0] = dist(last_ip(unmatched_ip_set[i])[0], new_ip)
cmin[1] = i
connected_set = unmatched_ip_set
connected_added = added_unmatched
if cmin[1] == -1:
unmatched_ip_set.append([None for _ in range(consecutive_frames - 1)] + [new_ip])
# re_matrix.append([])
# gf_matrix.append([])
else:
connected_added[cmin[1]] = True
pop_and_add(connected_set[cmin[1]], new_ip, consecutive_frames)
i = 0
while i < len(added_matched):
if not added_matched[i]:
pop_and_add(matched_ip_set[i], None, consecutive_frames)
if matched_ip_set[i] == [None for _ in range(consecutive_frames)]:
matched_ip_set.pop(i)
# re_matrix.pop(i)
# gf_matrix.pop(i)
added_matched.pop(i)
continue
i += 1