|
a |
|
b/SAC/IK_Framework_Mujoco.py |
|
|
1 |
#Minimal implementation of the RL_Framework_Mujoco environment for CMA-ES and IK Optimization |
|
|
2 |
#for finding the inital pose of the musculoskeletal model |
|
|
3 |
#and for trajectory visualization |
|
|
4 |
|
|
|
5 |
from gym import error, spaces |
|
|
6 |
from gym.utils import seeding |
|
|
7 |
import numpy as np |
|
|
8 |
from os import path |
|
|
9 |
import gym |
|
|
10 |
import pickle |
|
|
11 |
|
|
|
12 |
import numpy as np |
|
|
13 |
from gym import utils |
|
|
14 |
from . import sensory_feedback_specs |
|
|
15 |
from . import kinematics_preprocessing_specs |
|
|
16 |
|
|
|
17 |
import ipdb |
|
|
18 |
|
|
|
19 |
try: |
|
|
20 |
import mujoco_py |
|
|
21 |
except ImportError as e: |
|
|
22 |
raise error.DependencyNotInstalled("{}. (HINT: you need to install mujoco_py, and also perform the setup instructions here: https://github.com/openai/mujoco-py/.)".format(e)) |
|
|
23 |
|
|
|
24 |
DEFAULT_SIZE = 500 |
|
|
25 |
|
|
|
26 |
class MujocoEnv(gym.Env): |
|
|
27 |
"""Superclass for all MuJoCo environments. |
|
|
28 |
""" |
|
|
29 |
def __init__(self, model_path, condition_to_sim, cond_tpoint, args): |
|
|
30 |
|
|
|
31 |
self.model_path = model_path |
|
|
32 |
self.initial_pose_path = args.initial_pose_path |
|
|
33 |
self.kinematics_path = args.kinematics_path |
|
|
34 |
self.cond_to_sim = condition_to_sim |
|
|
35 |
self.cond_tpoint = cond_tpoint |
|
|
36 |
|
|
|
37 |
#setup the model |
|
|
38 |
self.model = mujoco_py.load_model_from_path(model_path) |
|
|
39 |
|
|
|
40 |
#setup the simulator and data object |
|
|
41 |
self.sim = mujoco_py.MjSim(self.model) |
|
|
42 |
self.data = self.sim.data |
|
|
43 |
|
|
|
44 |
#Save all the sensory feedback specs for use in the later functions |
|
|
45 |
self.sfs_proprioceptive_feedback = args.proprioceptive_feedback |
|
|
46 |
self.sfs_muscle_forces = args.muscle_forces |
|
|
47 |
self.sfs_joint_feedback = args.joint_feedback |
|
|
48 |
self.sfs_visual_feedback = args.visual_feedback |
|
|
49 |
self.sfs_visual_feedback_bodies = args.visual_feedback_bodies |
|
|
50 |
self.sfs_visual_distance_bodies = args.visual_distance_bodies |
|
|
51 |
self.sfs_visual_velocity = args.visual_velocity |
|
|
52 |
self.sfs_sensory_delay_timepoints = args.sensory_delay_timepoints |
|
|
53 |
|
|
|
54 |
#Load the kin_train |
|
|
55 |
# Load the experimental kinematics x and y coordinates from the data |
|
|
56 |
with open(self.kinematics_path + '/kinematics.pkl', 'rb') as f: |
|
|
57 |
kin_train_test = pickle.load(f) |
|
|
58 |
|
|
|
59 |
kin_train = kin_train_test['train'] #[num_conds][num_targets, num_coords, timepoints] |
|
|
60 |
|
|
|
61 |
#Randomly sample the number of targets from a condition = 0 |
|
|
62 |
num_targets = kin_train[0].shape[0] |
|
|
63 |
|
|
|
64 |
#Find the qpos corresponding to the musculoskeletal model and targets |
|
|
65 |
self.qpos_idx_musculo = np.array(list(range(0, self.model.nq))) |
|
|
66 |
self.qpos_idx_targets = [] |
|
|
67 |
for musculo_targets in kinematics_preprocessing_specs.musculo_target_joints: |
|
|
68 |
joint_idx = self.model.get_joint_qpos_addr(musculo_targets) |
|
|
69 |
self.qpos_idx_targets.append(joint_idx) |
|
|
70 |
|
|
|
71 |
#Delete the corresponding index from the qpos_idx_musculo |
|
|
72 |
self.qpos_idx_musculo = np.delete(self.qpos_idx_musculo, self.qpos_idx_targets).tolist() |
|
|
73 |
|
|
|
74 |
#Set the simulation timestep |
|
|
75 |
if args.sim_dt != 0: |
|
|
76 |
self.model.opt.timestep = args.sim_dt |
|
|
77 |
|
|
|
78 |
|
|
|
79 |
self.n_fixedsteps = args.n_fixedsteps |
|
|
80 |
self.radius = args.trajectory_scaling |
|
|
81 |
self.center = args.center |
|
|
82 |
|
|
|
83 |
#Kinematics preprocessing for training and testing kinematics |
|
|
84 |
#Preprocess training kinematics |
|
|
85 |
for i_target in range(kin_train[0].shape[0]): |
|
|
86 |
for i_cond in range(len(kin_train)): |
|
|
87 |
for i_coord in range(kin_train[i_cond].shape[1]): |
|
|
88 |
kin_train[i_cond][i_target, i_coord, :] = kin_train[i_cond][i_target, i_coord, :] / self.radius[i_target] |
|
|
89 |
kin_train[i_cond][i_target, i_coord, :] = kin_train[i_cond][i_target, i_coord, :] + self.center[i_target][i_coord] |
|
|
90 |
|
|
|
91 |
|
|
|
92 |
self.kin_train = kin_train |
|
|
93 |
self.kin_to_sim = self.kin_train |
|
|
94 |
|
|
|
95 |
self.current_cond_to_sim = self.cond_to_sim |
|
|
96 |
|
|
|
97 |
self.upd_theta(self.cond_tpoint) |
|
|
98 |
|
|
|
99 |
|
|
|
100 |
self.viewer = None |
|
|
101 |
self._viewers = {} |
|
|
102 |
|
|
|
103 |
self.metadata = { |
|
|
104 |
'render.modes': ['human', 'rgb_array', 'depth_array'], |
|
|
105 |
'video.frames_per_second': int(np.round(1.0 / self.dt)) |
|
|
106 |
} |
|
|
107 |
|
|
|
108 |
#init_qpos and init_qvel do not contain target qpos |
|
|
109 |
#and contain only the musculo qpos |
|
|
110 |
#If the initial qpos and qvel are provided by the user |
|
|
111 |
if path.isfile(self.initial_pose_path + '/qpos.npy'): |
|
|
112 |
init_qpos = np.load(self.initial_pose_path + '/qpos.npy') |
|
|
113 |
init_qvel = np.load(self.initial_pose_path + '/qvel.npy') |
|
|
114 |
|
|
|
115 |
#else use the default initial pose of xml model |
|
|
116 |
else: |
|
|
117 |
init_qpos = self.sim.data.qpos.flat.copy()[self.qpos_idx_musculo] |
|
|
118 |
init_qvel = self.sim.data.qvel.flat.copy()[self.qpos_idx_musculo] |
|
|
119 |
|
|
|
120 |
#Get the qpos of musclo + targets |
|
|
121 |
musculo_qpos = self.sim.data.qpos.flat.copy() |
|
|
122 |
musculo_qvel = self.sim.data.qvel.flat.copy() |
|
|
123 |
|
|
|
124 |
#Set the musculo part to the saved initial qpos and qvel |
|
|
125 |
musculo_qpos[self.qpos_idx_musculo] = init_qpos |
|
|
126 |
|
|
|
127 |
self.init_qpos = musculo_qpos |
|
|
128 |
|
|
|
129 |
#Set the initial state to init_qpos |
|
|
130 |
#Set the state to the initial pose |
|
|
131 |
self.set_state(self.init_qpos) |
|
|
132 |
|
|
|
133 |
if self.sfs_visual_feedback == True: |
|
|
134 |
|
|
|
135 |
#Save the xpos of the musculo bodies for visual vels |
|
|
136 |
if len(self.sfs_visual_velocity) != 0: |
|
|
137 |
self.prev_body_xpos = [] |
|
|
138 |
for musculo_body in self.sfs_visual_velocity: |
|
|
139 |
body_xpos = self.sim.data.get_body_xpos(musculo_body) |
|
|
140 |
self.prev_body_xpos = [*self.prev_body_xpos, *body_xpos] |
|
|
141 |
|
|
|
142 |
#Return the qpos of only the musculoskeletal bodies and not the targets |
|
|
143 |
def get_musculo_state(self): |
|
|
144 |
qpos_all = self.sim.data.qpos.flat.copy() |
|
|
145 |
qpos_musculo = qpos_all[self.qpos_idx_musculo] |
|
|
146 |
|
|
|
147 |
return qpos_musculo |
|
|
148 |
|
|
|
149 |
|
|
|
150 |
def set_state(self, qpos): |
|
|
151 |
|
|
|
152 |
assert qpos.shape == (self.model.nq, ) |
|
|
153 |
|
|
|
154 |
old_state= self.sim.get_state() |
|
|
155 |
|
|
|
156 |
new_state= mujoco_py.MjSimState(old_state.time, qpos, qpos*0, |
|
|
157 |
old_state.act, old_state.udd_state) |
|
|
158 |
|
|
|
159 |
self.sim.set_state(new_state) |
|
|
160 |
self.sim.forward() |
|
|
161 |
|
|
|
162 |
def set_state_musculo(self, qpos): |
|
|
163 |
|
|
|
164 |
old_state= self.sim.get_state() |
|
|
165 |
qpos_all = self.sim.data.qpos.flat.copy() |
|
|
166 |
qpos_all[self.qpos_idx_musculo] = qpos |
|
|
167 |
|
|
|
168 |
assert qpos_all.shape == (self.model.nq, ) |
|
|
169 |
|
|
|
170 |
new_state= mujoco_py.MjSimState(old_state.time, qpos_all, qpos_all*0, |
|
|
171 |
old_state.act, old_state.udd_state) |
|
|
172 |
|
|
|
173 |
self.sim.set_state(new_state) |
|
|
174 |
self.sim.forward() |
|
|
175 |
|
|
|
176 |
def get_obs_musculo_bodies(self): |
|
|
177 |
#Returns the current xyz coords of the musculo bodies to be tracked |
|
|
178 |
musculo_body_state = [] |
|
|
179 |
for musculo_body in kinematics_preprocessing_specs.musculo_tracking: |
|
|
180 |
current_xyz_coord = self.sim.data.get_body_xpos(musculo_body[0]).flat.copy() |
|
|
181 |
musculo_body_state.append(current_xyz_coord) |
|
|
182 |
|
|
|
183 |
return np.array(musculo_body_state) #[n_musculo_bodies, 3] |
|
|
184 |
|
|
|
185 |
|
|
|
186 |
def get_obs_targets(self): |
|
|
187 |
#Returns the current xyz coords of the targets to be tracked |
|
|
188 |
musculo_target_state = [] |
|
|
189 |
for musculo_body in kinematics_preprocessing_specs.musculo_tracking: |
|
|
190 |
current_xyz_coord = self.sim.data.get_body_xpos(musculo_body[1]).flat.copy() |
|
|
191 |
musculo_target_state.append(current_xyz_coord) |
|
|
192 |
|
|
|
193 |
return np.array(musculo_target_state) #[n_musculo_targets, 3] |
|
|
194 |
|
|
|
195 |
def set_cond_to_simulate(self, i_condition, cond_timepoint): |
|
|
196 |
|
|
|
197 |
#update the state variables |
|
|
198 |
self.cond_to_sim = i_condition |
|
|
199 |
self.cond_tpoint = cond_timepoint |
|
|
200 |
|
|
|
201 |
self.current_cond_to_sim = self.cond_to_sim |
|
|
202 |
|
|
|
203 |
self.upd_theta(self.cond_tpoint) |
|
|
204 |
|
|
|
205 |
|
|
|
206 |
@property |
|
|
207 |
def dt(self): |
|
|
208 |
return self.model.opt.timestep |
|
|
209 |
|
|
|
210 |
def render(self, |
|
|
211 |
mode='human', |
|
|
212 |
width=DEFAULT_SIZE, |
|
|
213 |
height=DEFAULT_SIZE, |
|
|
214 |
camera_id=0, |
|
|
215 |
camera_name=None): |
|
|
216 |
if mode == 'rgb_array' or mode == 'depth_array': |
|
|
217 |
if camera_id is not None and camera_name is not None: |
|
|
218 |
raise ValueError("Both `camera_id` and `camera_name` cannot be" |
|
|
219 |
" specified at the same time.") |
|
|
220 |
|
|
|
221 |
no_camera_specified = camera_name is None and camera_id is None |
|
|
222 |
if no_camera_specified: |
|
|
223 |
camera_name = 'track' |
|
|
224 |
|
|
|
225 |
if camera_id is None and camera_name in self.model._camera_name2id: |
|
|
226 |
camera_id = self.model.camera_name2id(camera_name) |
|
|
227 |
|
|
|
228 |
self._get_viewer(mode).render(width, height, camera_id=camera_id) |
|
|
229 |
|
|
|
230 |
if mode == 'rgb_array': |
|
|
231 |
# window size used for old mujoco-py: |
|
|
232 |
data = self._get_viewer(mode).read_pixels(width, height, depth=False) |
|
|
233 |
# original image is upside-down, so flip it |
|
|
234 |
return data[::-1, :, :] |
|
|
235 |
elif mode == 'depth_array': |
|
|
236 |
self._get_viewer(mode).render(width, height) |
|
|
237 |
# window size used for old mujoco-py: |
|
|
238 |
# Extract depth part of the read_pixels() tuple |
|
|
239 |
data = self._get_viewer(mode).read_pixels(width, height, depth=True)[1] |
|
|
240 |
# original image is upside-down, so flip it |
|
|
241 |
return data[::-1, :] |
|
|
242 |
elif mode == 'human': |
|
|
243 |
self._get_viewer(mode).render() |
|
|
244 |
|
|
|
245 |
def close(self): |
|
|
246 |
if self.viewer is not None: |
|
|
247 |
# self.viewer.finish() |
|
|
248 |
self.viewer = None |
|
|
249 |
self._viewers = {} |
|
|
250 |
|
|
|
251 |
def _get_viewer(self, mode): |
|
|
252 |
self.viewer = self._viewers.get(mode) |
|
|
253 |
if self.viewer is None: |
|
|
254 |
if mode == 'human': |
|
|
255 |
self.viewer = mujoco_py.MjViewer(self.sim) |
|
|
256 |
elif mode == 'rgb_array' or mode == 'depth_array': |
|
|
257 |
self.viewer = mujoco_py.MjRenderContextOffscreen(self.sim, -1) |
|
|
258 |
|
|
|
259 |
self.viewer_setup() |
|
|
260 |
self._viewers[mode] = self.viewer |
|
|
261 |
return self.viewer |
|
|
262 |
|
|
|
263 |
def get_body_com(self, body_name): |
|
|
264 |
return self.data.get_body_xpos(body_name).copy() |
|
|
265 |
|
|
|
266 |
|
|
|
267 |
class Muscle_Env(MujocoEnv): |
|
|
268 |
|
|
|
269 |
def __init__(self, model_path, condition_to_sim, cond_tpoint, args): |
|
|
270 |
MujocoEnv.__init__(self, model_path, condition_to_sim, cond_tpoint, args) |
|
|
271 |
|
|
|
272 |
def viewer_setup(self): |
|
|
273 |
self.viewer.cam.trackbodyid = 0 |
|
|
274 |
|
|
|
275 |
def upd_theta(self, cond_timepoint): |
|
|
276 |
|
|
|
277 |
coords_to_sim = self.kin_to_sim[self.current_cond_to_sim] #[num_targets, num_coords, timepoints] |
|
|
278 |
|
|
|
279 |
assert cond_timepoint < coords_to_sim.shape[-1] |
|
|
280 |
|
|
|
281 |
crnt_state = self.sim.get_state() |
|
|
282 |
|
|
|
283 |
for i_target in range(self.kin_to_sim[self.current_cond_to_sim].shape[0]): |
|
|
284 |
if kinematics_preprocessing_specs.xyz_target[i_target][0]: |
|
|
285 |
x_joint_idx= self.model.get_joint_qpos_addr(f"box:x{i_target}") |
|
|
286 |
crnt_state.qpos[x_joint_idx] = coords_to_sim[i_target, 0, cond_timepoint] |
|
|
287 |
|
|
|
288 |
|
|
|
289 |
if kinematics_preprocessing_specs.xyz_target[i_target][1]: |
|
|
290 |
y_joint_idx= self.model.get_joint_qpos_addr(f"box:y{i_target}") |
|
|
291 |
crnt_state.qpos[y_joint_idx] = coords_to_sim[i_target, kinematics_preprocessing_specs.xyz_target[i_target][0], cond_timepoint] |
|
|
292 |
|
|
|
293 |
if kinematics_preprocessing_specs.xyz_target[i_target][2]: |
|
|
294 |
z_joint_idx= self.model.get_joint_qpos_addr(f"box:z{i_target}") |
|
|
295 |
crnt_state.qpos[z_joint_idx] = coords_to_sim[i_target, kinematics_preprocessing_specs.xyz_target[i_target][0] + kinematics_preprocessing_specs.xyz_target[i_target][1], cond_timepoint] |
|
|
296 |
|
|
|
297 |
|
|
|
298 |
#Now set the state |
|
|
299 |
self.set_state(crnt_state.qpos) |
|
|
300 |
|
|
|
301 |
|
|
|
302 |
def _get_obs(self): |
|
|
303 |
|
|
|
304 |
sensory_feedback = [] |
|
|
305 |
if self.sfs_proprioceptive_feedback == True: |
|
|
306 |
muscle_lens = self.sim.data.actuator_length.flat.copy() |
|
|
307 |
muscle_vels = self.sim.data.actuator_velocity.flat.copy() |
|
|
308 |
|
|
|
309 |
#process through the given function for muscle lens and muscle vels |
|
|
310 |
muscle_lens, muscle_vels = sensory_feedback_specs.process_proprioceptive(muscle_lens, muscle_vels) |
|
|
311 |
sensory_feedback = [*sensory_feedback, *muscle_lens, *muscle_vels] |
|
|
312 |
|
|
|
313 |
|
|
|
314 |
if self.sfs_muscle_forces == True: |
|
|
315 |
actuator_forces = self.sim.data.qfrc_actuator.flat.copy() |
|
|
316 |
|
|
|
317 |
#process |
|
|
318 |
actuator_forces = sensory_feedback_specs.process_muscle_forces(actuator_forces) |
|
|
319 |
sensory_feedback = [*sensory_feedback, *actuator_forces] |
|
|
320 |
|
|
|
321 |
|
|
|
322 |
if self.sfs_joint_feedback == True: |
|
|
323 |
sensory_qpos = self.sim.data.qpos.flat.copy() |
|
|
324 |
sensory_qvel = self.sim.data.qvel.flat.copy() |
|
|
325 |
|
|
|
326 |
sensory_qpos, sensory_qvel = sensory_feedback_specs.process_joint_feedback(sensory_qpos, sensory_qvel) |
|
|
327 |
sensory_feedback = [*sensory_feedback, *sensory_qpos, *sensory_qvel] |
|
|
328 |
|
|
|
329 |
|
|
|
330 |
if self.sfs_visual_feedback == True: |
|
|
331 |
|
|
|
332 |
#Check if the user specified the musculo bodies to be included |
|
|
333 |
assert len(self.sfs_visual_feedback_bodies) != 0 |
|
|
334 |
|
|
|
335 |
visual_xyz_coords = [] |
|
|
336 |
for musculo_body in self.sfs_visual_feedback_bodies: |
|
|
337 |
visual_xyz_coords = [*visual_xyz_coords, *self.sim.data.get_body_xpos(musculo_body)] |
|
|
338 |
|
|
|
339 |
visual_xyz_coords = sensory_feedback_specs.process_visual_position(visual_xyz_coords) |
|
|
340 |
sensory_feedback = [*sensory_feedback, *visual_xyz_coords] |
|
|
341 |
|
|
|
342 |
if len(self.sfs_visual_distance_bodies) != 0: |
|
|
343 |
visual_xyz_distance = [] |
|
|
344 |
for musculo_tuple in self.sfs_visual_distance_bodies: |
|
|
345 |
body0_xyz = self.sim.data.get_body_xpos(musculo_tuple[0]) |
|
|
346 |
body1_xyz = self.sim.data.get_body_xpos(musculo_tuple[1]) |
|
|
347 |
tuple_dist = np.abs(body0_xyz - body1_xyz).tolist() |
|
|
348 |
visual_xyz_distance = [*visual_xyz_distance, *tuple_dist] |
|
|
349 |
|
|
|
350 |
#process |
|
|
351 |
visual_xyz_distance = sensory_feedback_specs.process_visual_distance(visual_xyz_distance) |
|
|
352 |
sensory_feedback = [*sensory_feedback, *visual_xyz_distance] |
|
|
353 |
|
|
|
354 |
#Save the xpos of the musculo bodies for visual vels |
|
|
355 |
if len(self.sfs_visual_velocity) != 0: |
|
|
356 |
|
|
|
357 |
#Find the visual vels after the simulation |
|
|
358 |
current_body_xpos = [] |
|
|
359 |
for musculo_body in self.sfs_visual_velocity: |
|
|
360 |
body_xpos = self.sim.data.get_body_xpos(musculo_body) |
|
|
361 |
current_body_xpos = [*current_body_xpos, *body_xpos] |
|
|
362 |
|
|
|
363 |
#Find the velocity |
|
|
364 |
visual_vels = (np.abs(np.array(self.prev_body_xpos) - np.array(current_body_xpos)) / self.dt).tolist() |
|
|
365 |
|
|
|
366 |
#process visual velocity feedback |
|
|
367 |
visual_vels = sensory_feedback_specs.process_visual_velocity(visual_vels) |
|
|
368 |
sensory_feedback= [*sensory_feedback, *visual_vels] |
|
|
369 |
|
|
|
370 |
#update the self.prev_body_xpos |
|
|
371 |
self.prev_body_xpos = current_body_xpos |
|
|
372 |
|
|
|
373 |
return np.array(sensory_feedback) |