--- a +++ b/envs/target/v_tgt_field.py @@ -0,0 +1,371 @@ +# Author(s): Seungmoon Song <seungmoon.song@gmail.com> +""" +... +""" + +from __future__ import division # '/' always means non-truncating division +import numpy as np +from scipy import interpolate + + +class VTgtField(object): + nn_get = np.array([11, 11]) # vtgt_field_local data is nn_get*nn_get = 121 + + ver = {} + # v00: constant forward velocities + ver['ver00'] = {} + ver['ver00']['res_map'] = np.array([2, 2]) + ver['ver00']['rng_xy0'] = np.array([[-20, 20], [-20, 20]]) + ver['ver00']['rng_get'] = np.array([[-5, 5], [-5, 5]]) + ver['ver00']['v_amp_rng'] = {} + ver['ver00']['rng_p_sink_r_th'] = {} + ver['ver00']['r_target'] = {} + ver['ver00']['v_amp'] = np.array([1.4, 0]) + ver['ver00']['n_new_target'] = 1 + + # v01: consecutive sinks forward for walking + ver['ver01'] = {} + ver['ver01']['res_map'] = np.array([2, 2]) + ver['ver01']['rng_xy0'] = np.array([[-20, 20], [-20, 20]]) + ver['ver01']['rng_get'] = np.array([[-5, 5], [-5, 5]]) + ver['ver01']['v_amp_rng'] = np.array([.8, 1.8]) + ver['ver01']['rng_p_sink_r_th'] = np.array([[5, 7], [0, 0]]) + ver['ver01']['r_target'] = .3 + ver['ver01']['n_new_target'] = float("inf") + + # v02: consecutive sinks for walking (-90 < th < 90) + ver['ver02'] = {} + ver['ver02']['res_map'] = np.array([2, 2]) + ver['ver02']['rng_xy0'] = np.array([[-20, 20], [-20, 20]]) + ver['ver02']['rng_get'] = np.array([[-5, 5], [-5, 5]]) + ver['ver02']['v_amp_rng'] = np.array([.8, 1.8]) + ver['ver02']['rng_p_sink_r_th'] = np.array([[5, 7], [-90*np.pi/180, 90*np.pi/180]]) + ver['ver02']['r_target'] = .3 + ver['ver02']['n_new_target'] = float("inf") + + # v03: consecutive sinks for walking (-180 < th < 180) + # vs. v02: larger range; n_target = 2 + ver['ver03'] = {} + ver['ver03']['res_map'] = np.array([2, 2]) + ver['ver03']['rng_xy0'] = np.array([[-20, 20], [-20, 20]]) + ver['ver03']['rng_get'] = np.array([[-5, 5], [-5, 5]]) + ver['ver03']['v_amp_rng'] = np.array([.8, 1.8]) + ver['ver03']['rng_p_sink_r_th'] = np.array([[5, 7], [-180*np.pi/180, 180*np.pi/180]]) + ver['ver03']['r_target'] = .3 + ver['ver03']['n_new_target'] = 2 + + vtgt_space = np.array([ [-10] * 2*11*11, [10] * 2*11*11 ]) + +# ----------------------------------------------------------------------------------------------------------------- + def __init__(self, visualize=True, version=1, dt=.01, dt_visualize=0.5, seed=None): + self.dt = dt + self.visualize = visualize + self.dt_visualize = dt_visualize + self.di_visualize = int(dt_visualize/dt) + + if seed: + np.random.seed(seed) + +# ----------------------------------------------------------------------------------------------------------------- + def reset(self, version=1, seed=None, pose_agent=np.array([0, 0, 0])): + self.t = 0 + self.i = 0 + self.i_target = 1 + self.t_target = 0 + + if version not in [0, 1, 2, 3]: + raise ValueError("vtgt version should be in [0, 1, 2].") + self.ver['version'] = version + # set parameters + s_ver = 'ver{}'.format(str(version).rjust(2,'0')) + self.rng_xy0 = self.ver[s_ver]['rng_xy0'] + self.v_amp_rng = self.ver[s_ver]['v_amp_rng'] + self.rng_p_sink_r_th = self.ver[s_ver]['rng_p_sink_r_th'] + self.r_target = self.ver[s_ver]['r_target'] + self.rng_get = self.ver[s_ver]['rng_get'] + self.res_map = self.ver[s_ver]['res_map'] + self.n_new_target = self.ver[s_ver]['n_new_target'] + + self.res_get = np.array([ (self.rng_get[0,1]-self.rng_get[0,0]+1)/self.nn_get[0], + (self.rng_get[1,1]-self.rng_get[1,0]+1)/self.nn_get[1]]) + self.pose_agent = pose_agent + self.rng_xy = (self.pose_agent[0:2] + self.rng_xy0.T).T + + if self.ver['version'] is 0: + # map coordinate and vtgt + self.vtgt_obj = VTgtConst(v_tgt=self.ver['ver00']['v_amp'], + rng_xy=self.rng_xy, res_map=self.res_map, + rng_get=self.rng_get, res_get=self.res_get) + self.create_vtgt_const(v_tgt=self.ver['ver00']['v_amp']) + elif self.ver['version'] in [1, 2, 3]: + # map coordinate and vtgt + self.vtgt_obj = VTgtSink(rng_xy=self.rng_xy, res_map=self.res_map, + rng_get=self.rng_get, res_get=self.res_get) + + if seed: + np.random.seed(seed) + + # create first sink + del_p_sink_r = np.random.uniform(self.rng_p_sink_r_th[0,0], self.rng_p_sink_r_th[0,1]) + del_p_sink_th = np.random.uniform(self.rng_p_sink_r_th[1,0], self.rng_p_sink_r_th[1,1]) + del_p_sink_x = np.cos(del_p_sink_th)*del_p_sink_r + del_p_sink_y = np.sin(del_p_sink_th)*del_p_sink_r + self.path_th = del_p_sink_th + self.p_sink = self.pose_agent[0:2] + np.array([del_p_sink_x, del_p_sink_y]) + self.create_vtgt_sink(self.v_amp_rng) + + if self.visualize: + import matplotlib.pyplot as plt + if hasattr(self, 'vis'): + self.vis['plt'].close(self.vis['hndl']) + self.vis = {} + self.vis['plt'] = plt + self.vis['hndl'], self.vis['axes'] = self.vis['plt'].subplots(2,1, figsize=(5, 6)) + X = self.vtgt_obj.map[0] + Y = self.vtgt_obj.map[1] + U = self.vtgt_obj.vtgt[0] + V = self.vtgt_obj.vtgt[1] + R = np.sqrt(U**2 + V**2) + self.vis['q0'] = self.vis['axes'][0].quiver(X, Y, U, V, R) + self.vis['axes'][0].axis('equal') + self.vis['axes'][0].set_title('v$_{tgt}$ (global)') + self.vis['axes'][0].set_xlabel('x') + self.vis['axes'][0].set_ylabel('y') + + v_tgt_field = self.vtgt_obj.get_vtgt_field_local(pose_agent) + X, Y = self.vtgt_obj._generate_grid(self.vtgt_obj.rng_get, self.vtgt_obj.res_get) + U = v_tgt_field[0] + V = v_tgt_field[1] + R = np.sqrt(U**2 + V**2) + self.vis['q1'] = self.vis['axes'][1].quiver(X, Y, U, V, R) + self.vis['axes'][1].axis('equal') + self.vis['axes'][1].set_title('v$_{tgt}$ (body)') + self.vis['axes'][1].set_xlabel('forward') + self.vis['axes'][1].set_ylabel('leftward') + + self.vis['plt'].tight_layout() + self.vis['plt'].pause(0.0001) + +# ----------------------------------------------------------------------------------------------------------------- + def create_vtgt_const(self, v_tgt): + self.vtgt_obj.create_vtgt_const(v_tgt) + +# ----------------------------------------------------------------------------------------------------------------- + def create_vtgt_sink(self, v_amp_rng): + d_sink = np.linalg.norm(self.p_sink - self.pose_agent[0:2]) + v_phase0 = np.random.uniform(-np.pi, np.pi) + self.t0_target = np.random.uniform(2, 4) + self.vtgt_obj.create_vtgt_sink(self.p_sink, d_sink, v_amp_rng, v_phase0=v_phase0) + +# ----------------------------------------------------------------------------------------------------------------- + def update(self, pose): + self.t += self.dt + self.i += 1 + + self.pose_agent = pose + + if not hasattr(self, 'p_sink'): + flag_target_achieved = 0 + else: + if np.linalg.norm(self.p_sink - self.pose_agent[0:2]) < self.r_target: + self.t_target += self.dt + else: # reset t_target if agent goes out of + self.t_target = 0 + + flag_target_achieved = 0 + if (self.t_target > self.t0_target # stayed at the target + and self.i_target <= self.n_new_target): # on a new target + if self.i_target < self.n_new_target: # if ... create new target + del_p_sink_r = np.random.uniform(self.rng_p_sink_r_th[0,0], self.rng_p_sink_r_th[0,1]) + del_p_sink_th = np.random.uniform(self.rng_p_sink_r_th[1,0], self.rng_p_sink_r_th[1,1]) + self.path_th += del_p_sink_th + del_p_sink_x = np.cos(self.path_th)*del_p_sink_r + del_p_sink_y = np.sin(self.path_th)*del_p_sink_r + self.p_sink += np.array([del_p_sink_x, del_p_sink_y]) + self.rng_xy = (self.pose_agent[0:2] + self.rng_xy0.T).T + self.vtgt_obj.create_map(self.rng_xy) + self.create_vtgt_sink(self.v_amp_rng) + self.i_target += 1 + self.t_target = 0 + flag_target_achieved = 1 + + v_tgt_field = self.vtgt_obj.get_vtgt_field_local(pose) + if self.visualize: + if flag_target_achieved: + self.vis['q0'].remove() + X = self.vtgt_obj.map[0] + Y = self.vtgt_obj.map[1] + U = self.vtgt_obj.vtgt[0] + V = self.vtgt_obj.vtgt[1] + R = np.sqrt(U**2 + V**2) + self.vis['q0'] = self.vis['axes'][0].quiver(X, Y, U, V, R) + self.vis['axes'][0].axis('equal') + + if self.di_visualize == 1 or self.i%self.di_visualize==1 or self.t == self.dt: + self.vis['axes'][0].plot(pose[0], pose[1], 'k.') + + X, Y = self.vtgt_obj._generate_grid(self.vtgt_obj.rng_get, self.vtgt_obj.res_get) + U = v_tgt_field[0] + V = v_tgt_field[1] + R = np.sqrt(U**2 + V**2) + self.vis['q1'].remove() + self.vis['q1'] = self.vis['axes'][1].quiver(X, Y, U, V, R) + self.vis['axes'][1].plot(0, 0, 'k.') + self.vis['axes'][1].axis('equal') + + self.vis['plt'].pause(0.0001) + + + return v_tgt_field, flag_target_achieved + +# ----------------------------------------------------------------------------------------------------------------- + def get_vtgt(self, xy): + return self.vtgt_obj.get_vtgt(xy) + +# ----------------------------------------------------------------------------------------------------------------- + def get_vtgt_field_local(self, pose): + return self.vtgt_obj.get_vtgt_field_local(pose) + + +class VTgt0(object): +# ----------------------------------------------------------------------------------------------------------------- + def __init__(self, rng_xy=np.array([[-30, 30], [-30, 30]]), res_map=np.array([2, 2]), + rng_get=np.array([[-5, 5], [-5, 5]]), res_get=np.array([2, 2]) ): + # set parameters + self.res_map = res_map + self.res_get = res_get + self.rng_get = rng_get + + # map coordinate and vtgt + self.create_map(rng_xy) + self.vtgt = -1*self.map + +# ----------------------------------------------------------------------------------------------------------------- + def __del__(self): + nn = "empty" + +# ----------------------------------------------------------------------------------------------------------------- + def create_map(self, rng_xy): + self.map_rng_xy = rng_xy + self.map = self._generate_grid(rng_xy, self.res_map) + +# ----------------------------------------------------------------------------------------------------------------- + def _generate_grid(self, rng_xy=np.array([[-10, 10], [-10, 10]]), res=np.array([2, 2])): + xo = .5*(rng_xy[0,0]+rng_xy[0,1]) + x_del = (rng_xy[0,1]-xo)*res[0] + yo = .5*(rng_xy[1,0]+rng_xy[1,1]) + y_del = (rng_xy[1,1]-yo)*res[1] + grid = np.mgrid[-x_del:x_del+1, -y_del:y_del+1] + grid[0] = grid[0]/res[0] + xo + grid[1] = grid[1]/res[1] + yo + return grid + +# ----------------------------------------------------------------------------------------------------------------- + def get_vtgt(self, xy): # in the global frame + vtgt_x = self.vtgt_interp_x(xy[0], xy[1]) + vtgt_y = self.vtgt_interp_y(xy[0], xy[1]) + return np.array([vtgt_x, vtgt_y]) + +# ----------------------------------------------------------------------------------------------------------------- + def get_vtgt_field_local(self, pose): + xy = pose[0:2] + th = pose[2] + + # create query map + get_map0 = self._generate_grid(self.rng_get, self.res_get) + get_map_x = np.cos(th)*get_map0[0,:,:] - np.sin(th)*get_map0[1,:,:] + xy[0] + get_map_y = np.sin(th)*get_map0[0,:,:] + np.cos(th)*get_map0[1,:,:] + xy[1] + + # get vtgt + vtgt_x0 = np.reshape(np.array([self.vtgt_interp_x(x, y) \ + for x, y in zip(get_map_x.flatten(), get_map_y.flatten())]), + get_map_x.shape) + vtgt_y0 = np.reshape(np.array([self.vtgt_interp_y(x, y) \ + for x, y in zip(get_map_x.flatten(), get_map_y.flatten())]), + get_map_y.shape) + + vtgt_x = np.cos(-th)*vtgt_x0 - np.sin(-th)*vtgt_y0 + vtgt_y = np.sin(-th)*vtgt_x0 + np.cos(-th)*vtgt_y0 + + # debug + """ + if xy[0] > 10: + import matplotlib.pyplot as plt + plt.figure(100) + plt.axes([.025, .025, .95, .95]) + plt.plot(get_map_x, get_map_y, '.') + plt.axis('equal') + + plt.figure(101) + plt.axes([.025, .025, .95, .95]) + R = np.sqrt(vtgt_x0**2 + vtgt_y0**2) + plt.quiver(get_map_x, get_map_y, vtgt_x0, vtgt_y0, R) + plt.axis('equal') + + plt.show() + """ + + return np.stack((vtgt_x, vtgt_y)) + + +class VTgtSink(VTgt0): +# ----------------------------------------------------------------------------------------------------------------- + def __init__(self, rng_xy=np.array([[-30, 30], [-30, 30]]), res_map=np.array([2, 2]), + rng_get=np.array([[-5, 5], [-5, 5]]), res_get=np.array([2, 2]) ): + super(VTgtSink, self).__init__(rng_xy=rng_xy, res_map=res_map, + rng_get=rng_get, res_get=res_get) + self.vtgt = -1*self.map + +# ----------------------------------------------------------------------------------------------------------------- + def create_vtgt_sink(self, p_sink, d_sink, v_amp_rng, v_phase0=np.random.uniform(-np.pi, np.pi)): + # set vtgt orientations + rng_xy = (-p_sink + self.map_rng_xy.T).T + self.vtgt = -self._generate_grid(rng_xy, self.res_map) + + # set vtgt amplitudes + self._set_sink_vtgt_amp(p_sink, d_sink, v_amp_rng, v_phase0) + + self.vtgt_interp_x = interpolate.interp2d(self.map[0,:,0], self.map[1,0,:], self.vtgt[0].T, kind='linear') + self.vtgt_interp_y = interpolate.interp2d(self.map[0,:,0], self.map[1,0,:], self.vtgt[1].T, kind='linear') + +# ----------------------------------------------------------------------------------------------------------------- + def _set_sink_vtgt_amp(self, p_sink, d_sink, v_amp_rng, v_phase0, d_dec = 1): + # d_dec: start to decelerate within d_dec of sink + + for i_x, x in enumerate(self.map[0,:,0]): + for i_y, y in enumerate(self.map[1,0,:]): + d = np.linalg.norm([ x-p_sink[0], y-p_sink[1] ]) + if d > d_sink + d_dec: + v_amp = v_amp_rng[1] + elif d > d_dec: + v_phase = v_phase0 + d/d_sink*2*np.pi + v_amp = .5*np.diff(v_amp_rng)*np.sin(v_phase) + np.mean(v_amp_rng) + else: + v_phase = v_phase0 + d_dec/d_sink*2*np.pi + v_amp0 = .5*np.diff(v_amp_rng)*np.sin(v_phase) + np.mean(v_amp_rng) + v_amp = d*v_amp0 + + amp_norm = np.linalg.norm(self.vtgt[:,i_x,i_y]) + self.vtgt[0,i_x,i_y] = v_amp*self.vtgt[0,i_x,i_y]/amp_norm + self.vtgt[1,i_x,i_y] = v_amp*self.vtgt[1,i_x,i_y]/amp_norm + + +class VTgtConst(VTgt0): +# ----------------------------------------------------------------------------------------------------------------- + def __init__(self, v_tgt=np.array([1.4, 0]), + rng_xy=np.array([[-30, 30], [-30, 30]]), res_map=np.array([2, 2]), + rng_get=np.array([[-5, 5], [-5, 5]]), res_get=np.array([2, 2]) ): + super(VTgtConst, self).__init__(rng_xy=rng_xy, res_map=res_map, + rng_get=rng_get, res_get=res_get) + self.vtgt = 1*self.map + self.vtgt[0].fill(v_tgt[0]) + self.vtgt[1].fill(v_tgt[1]) + +# ----------------------------------------------------------------------------------------------------------------- + def create_vtgt_const(self, v_tgt): + self.vtgt[0].fill(v_tgt[0]) + self.vtgt[1].fill(v_tgt[1]) + + self.vtgt_interp_x = interpolate.interp2d(self.map[0,:,0], self.map[1,0,:], self.vtgt[0].T, kind='linear') + self.vtgt_interp_y = interpolate.interp2d(self.map[0,:,0], self.map[1,0,:], self.vtgt[1].T, kind='linear') +