Switch to unified view

a b/SAC/IK_Framework_Mujoco.py
1
#Minimal implementation of the RL_Framework_Mujoco environment for CMA-ES and IK Optimization
2
#for finding the inital pose of the musculoskeletal model
3
#and for trajectory visualization
4
5
from gym import error, spaces
6
from gym.utils import seeding
7
import numpy as np
8
from os import path
9
import gym
10
import pickle
11
12
import numpy as np
13
from gym import utils
14
from . import sensory_feedback_specs
15
from . import kinematics_preprocessing_specs
16
17
import ipdb
18
19
try:
20
    import mujoco_py
21
except ImportError as e:
22
    raise error.DependencyNotInstalled("{}. (HINT: you need to install mujoco_py, and also perform the setup instructions here: https://github.com/openai/mujoco-py/.)".format(e))
23
24
DEFAULT_SIZE = 500
25
26
class MujocoEnv(gym.Env):
27
    """Superclass for all MuJoCo environments.
28
    """
29
    def __init__(self, model_path, condition_to_sim, cond_tpoint, args):
30
31
        self.model_path = model_path
32
        self.initial_pose_path = args.initial_pose_path
33
        self.kinematics_path = args.kinematics_path
34
        self.cond_to_sim = condition_to_sim
35
        self.cond_tpoint = cond_tpoint
36
37
         #setup the model
38
        self.model = mujoco_py.load_model_from_path(model_path)
39
40
        #setup the simulator and data object
41
        self.sim = mujoco_py.MjSim(self.model)
42
        self.data = self.sim.data
43
44
        #Save all the sensory feedback specs for use in the later functions
45
        self.sfs_proprioceptive_feedback = args.proprioceptive_feedback
46
        self.sfs_muscle_forces = args.muscle_forces
47
        self.sfs_joint_feedback = args.joint_feedback
48
        self.sfs_visual_feedback = args.visual_feedback
49
        self.sfs_visual_feedback_bodies = args.visual_feedback_bodies
50
        self.sfs_visual_distance_bodies = args.visual_distance_bodies
51
        self.sfs_visual_velocity = args.visual_velocity
52
        self.sfs_sensory_delay_timepoints = args.sensory_delay_timepoints
53
54
        #Load the kin_train
55
        # Load the experimental kinematics x and y coordinates from the data
56
        with open(self.kinematics_path + '/kinematics.pkl', 'rb') as f:
57
            kin_train_test = pickle.load(f)
58
59
        kin_train = kin_train_test['train'] #[num_conds][num_targets, num_coords, timepoints]
60
61
        #Randomly sample the number of targets from a condition = 0
62
        num_targets = kin_train[0].shape[0]
63
64
        #Find the qpos corresponding to the musculoskeletal model and targets
65
        self.qpos_idx_musculo = np.array(list(range(0, self.model.nq)))
66
        self.qpos_idx_targets = []
67
        for musculo_targets in kinematics_preprocessing_specs.musculo_target_joints:
68
            joint_idx = self.model.get_joint_qpos_addr(musculo_targets)
69
            self.qpos_idx_targets.append(joint_idx)
70
            
71
        #Delete the corresponding index from the qpos_idx_musculo
72
        self.qpos_idx_musculo = np.delete(self.qpos_idx_musculo, self.qpos_idx_targets).tolist()
73
74
        #Set the simulation timestep
75
        if args.sim_dt != 0:
76
            self.model.opt.timestep = args.sim_dt
77
78
79
        self.n_fixedsteps = args.n_fixedsteps
80
        self.radius = args.trajectory_scaling
81
        self.center = args.center
82
83
        #Kinematics preprocessing for training and testing kinematics
84
        #Preprocess training kinematics
85
        for i_target in range(kin_train[0].shape[0]):
86
            for i_cond in range(len(kin_train)):
87
                for i_coord in range(kin_train[i_cond].shape[1]):
88
                    kin_train[i_cond][i_target, i_coord, :] = kin_train[i_cond][i_target, i_coord, :] / self.radius[i_target]
89
                    kin_train[i_cond][i_target, i_coord, :] = kin_train[i_cond][i_target, i_coord, :] + self.center[i_target][i_coord]
90
91
92
        self.kin_train = kin_train
93
        self.kin_to_sim = self.kin_train
94
95
        self.current_cond_to_sim = self.cond_to_sim
96
97
        self.upd_theta(self.cond_tpoint)
98
99
100
        self.viewer = None 
101
        self._viewers = {}
102
103
        self.metadata = {
104
            'render.modes': ['human', 'rgb_array', 'depth_array'],
105
            'video.frames_per_second': int(np.round(1.0 / self.dt))
106
        }
107
108
        #init_qpos and init_qvel do not contain target qpos
109
        #and contain only the musculo qpos
110
        #If the initial qpos and qvel are provided by the user
111
        if path.isfile(self.initial_pose_path + '/qpos.npy'):
112
            init_qpos = np.load(self.initial_pose_path + '/qpos.npy')
113
            init_qvel = np.load(self.initial_pose_path + '/qvel.npy')
114
        
115
        #else use the default initial pose of xml model
116
        else:
117
            init_qpos = self.sim.data.qpos.flat.copy()[self.qpos_idx_musculo]
118
            init_qvel = self.sim.data.qvel.flat.copy()[self.qpos_idx_musculo]
119
120
        #Get the qpos of musclo + targets
121
        musculo_qpos = self.sim.data.qpos.flat.copy() 
122
        musculo_qvel = self.sim.data.qvel.flat.copy()
123
124
        #Set the musculo part to the saved initial qpos and qvel
125
        musculo_qpos[self.qpos_idx_musculo] = init_qpos
126
127
        self.init_qpos = musculo_qpos
128
129
        #Set the initial state to init_qpos
130
        #Set the state to the initial pose
131
        self.set_state(self.init_qpos)
132
133
        if self.sfs_visual_feedback == True:
134
135
            #Save the xpos of the musculo bodies for visual vels
136
            if len(self.sfs_visual_velocity) != 0:
137
                self.prev_body_xpos = []
138
                for musculo_body in self.sfs_visual_velocity:
139
                    body_xpos = self.sim.data.get_body_xpos(musculo_body)
140
                    self.prev_body_xpos = [*self.prev_body_xpos, *body_xpos]
141
142
    #Return the qpos of only the musculoskeletal bodies and not the targets
143
    def get_musculo_state(self):
144
        qpos_all = self.sim.data.qpos.flat.copy()
145
        qpos_musculo = qpos_all[self.qpos_idx_musculo]
146
147
        return qpos_musculo
148
   
149
150
    def set_state(self, qpos):
151
        
152
        assert qpos.shape == (self.model.nq, )
153
        
154
        old_state= self.sim.get_state()
155
156
        new_state= mujoco_py.MjSimState(old_state.time, qpos, qpos*0,
157
                                        old_state.act, old_state.udd_state)
158
159
        self.sim.set_state(new_state)
160
        self.sim.forward()
161
162
    def set_state_musculo(self, qpos):
163
        
164
        old_state= self.sim.get_state()
165
        qpos_all = self.sim.data.qpos.flat.copy()
166
        qpos_all[self.qpos_idx_musculo] = qpos
167
168
        assert qpos_all.shape == (self.model.nq, )
169
170
        new_state= mujoco_py.MjSimState(old_state.time, qpos_all, qpos_all*0,
171
                                        old_state.act, old_state.udd_state)
172
173
        self.sim.set_state(new_state)
174
        self.sim.forward()
175
176
    def get_obs_musculo_bodies(self):
177
        #Returns the current xyz coords of the musculo bodies to be tracked
178
        musculo_body_state = []
179
        for musculo_body in kinematics_preprocessing_specs.musculo_tracking:
180
            current_xyz_coord = self.sim.data.get_body_xpos(musculo_body[0]).flat.copy() 
181
            musculo_body_state.append(current_xyz_coord)
182
183
        return np.array(musculo_body_state)  #[n_musculo_bodies, 3]
184
185
186
    def get_obs_targets(self):
187
        #Returns the current xyz coords of the targets to be tracked
188
        musculo_target_state = []
189
        for musculo_body in kinematics_preprocessing_specs.musculo_tracking:
190
            current_xyz_coord = self.sim.data.get_body_xpos(musculo_body[1]).flat.copy() 
191
            musculo_target_state.append(current_xyz_coord)
192
193
        return np.array(musculo_target_state)  #[n_musculo_targets, 3]
194
195
    def set_cond_to_simulate(self, i_condition, cond_timepoint):
196
197
        #update the state variables
198
        self.cond_to_sim = i_condition
199
        self.cond_tpoint = cond_timepoint
200
201
        self.current_cond_to_sim = self.cond_to_sim
202
203
        self.upd_theta(self.cond_tpoint)
204
205
206
    @property
207
    def dt(self):
208
        return self.model.opt.timestep 
209
210
    def render(self,
211
               mode='human',
212
               width=DEFAULT_SIZE,
213
               height=DEFAULT_SIZE,
214
               camera_id=0,
215
               camera_name=None):
216
        if mode == 'rgb_array' or mode == 'depth_array':
217
            if camera_id is not None and camera_name is not None:
218
                raise ValueError("Both `camera_id` and `camera_name` cannot be"
219
                                 " specified at the same time.")
220
221
            no_camera_specified = camera_name is None and camera_id is None
222
            if no_camera_specified:
223
                camera_name = 'track'
224
225
            if camera_id is None and camera_name in self.model._camera_name2id:
226
                camera_id = self.model.camera_name2id(camera_name)
227
228
            self._get_viewer(mode).render(width, height, camera_id=camera_id)
229
230
        if mode == 'rgb_array':
231
            # window size used for old mujoco-py:
232
            data = self._get_viewer(mode).read_pixels(width, height, depth=False)
233
            # original image is upside-down, so flip it
234
            return data[::-1, :, :]
235
        elif mode == 'depth_array':
236
            self._get_viewer(mode).render(width, height)
237
            # window size used for old mujoco-py:
238
            # Extract depth part of the read_pixels() tuple
239
            data = self._get_viewer(mode).read_pixels(width, height, depth=True)[1]
240
            # original image is upside-down, so flip it
241
            return data[::-1, :]
242
        elif mode == 'human':
243
            self._get_viewer(mode).render()
244
245
    def close(self):
246
        if self.viewer is not None:
247
            # self.viewer.finish()
248
            self.viewer = None
249
            self._viewers = {}
250
251
    def _get_viewer(self, mode):
252
        self.viewer = self._viewers.get(mode)
253
        if self.viewer is None:
254
            if mode == 'human':
255
                self.viewer = mujoco_py.MjViewer(self.sim)
256
            elif mode == 'rgb_array' or mode == 'depth_array':
257
                self.viewer = mujoco_py.MjRenderContextOffscreen(self.sim, -1)
258
259
            self.viewer_setup()
260
            self._viewers[mode] = self.viewer
261
        return self.viewer
262
263
    def get_body_com(self, body_name):
264
        return self.data.get_body_xpos(body_name).copy()
265
266
267
class Muscle_Env(MujocoEnv):
268
269
    def __init__(self, model_path, condition_to_sim, cond_tpoint, args):
270
        MujocoEnv.__init__(self, model_path, condition_to_sim, cond_tpoint, args)
271
272
    def viewer_setup(self):
273
        self.viewer.cam.trackbodyid = 0
274
275
    def upd_theta(self, cond_timepoint):
276
277
        coords_to_sim = self.kin_to_sim[self.current_cond_to_sim] #[num_targets, num_coords, timepoints]
278
279
        assert cond_timepoint < coords_to_sim.shape[-1]
280
        
281
        crnt_state = self.sim.get_state()
282
283
        for i_target in range(self.kin_to_sim[self.current_cond_to_sim].shape[0]):
284
            if kinematics_preprocessing_specs.xyz_target[i_target][0]:
285
                x_joint_idx= self.model.get_joint_qpos_addr(f"box:x{i_target}")
286
                crnt_state.qpos[x_joint_idx] = coords_to_sim[i_target, 0, cond_timepoint]
287
288
289
            if kinematics_preprocessing_specs.xyz_target[i_target][1]:
290
                y_joint_idx= self.model.get_joint_qpos_addr(f"box:y{i_target}")
291
                crnt_state.qpos[y_joint_idx] = coords_to_sim[i_target, kinematics_preprocessing_specs.xyz_target[i_target][0], cond_timepoint]
292
293
            if kinematics_preprocessing_specs.xyz_target[i_target][2]:
294
                z_joint_idx= self.model.get_joint_qpos_addr(f"box:z{i_target}")
295
                crnt_state.qpos[z_joint_idx] = coords_to_sim[i_target, kinematics_preprocessing_specs.xyz_target[i_target][0] + kinematics_preprocessing_specs.xyz_target[i_target][1], cond_timepoint]
296
297
298
        #Now set the state
299
        self.set_state(crnt_state.qpos)
300
301
302
    def _get_obs(self):
303
        
304
        sensory_feedback = []
305
        if self.sfs_proprioceptive_feedback == True:
306
            muscle_lens = self.sim.data.actuator_length.flat.copy()
307
            muscle_vels = self.sim.data.actuator_velocity.flat.copy()
308
309
            #process through the given function for muscle lens and muscle vels
310
            muscle_lens, muscle_vels = sensory_feedback_specs.process_proprioceptive(muscle_lens, muscle_vels)
311
            sensory_feedback = [*sensory_feedback, *muscle_lens, *muscle_vels]
312
313
314
        if self.sfs_muscle_forces == True:
315
            actuator_forces = self.sim.data.qfrc_actuator.flat.copy()
316
317
            #process
318
            actuator_forces = sensory_feedback_specs.process_muscle_forces(actuator_forces)
319
            sensory_feedback = [*sensory_feedback, *actuator_forces]
320
321
322
        if self.sfs_joint_feedback == True:
323
            sensory_qpos = self.sim.data.qpos.flat.copy()
324
            sensory_qvel = self.sim.data.qvel.flat.copy()
325
326
            sensory_qpos, sensory_qvel = sensory_feedback_specs.process_joint_feedback(sensory_qpos, sensory_qvel)
327
            sensory_feedback = [*sensory_feedback, *sensory_qpos, *sensory_qvel]
328
329
330
        if self.sfs_visual_feedback == True:
331
            
332
            #Check if the user specified the musculo bodies to be included
333
            assert len(self.sfs_visual_feedback_bodies) != 0
334
335
            visual_xyz_coords = []
336
            for musculo_body in self.sfs_visual_feedback_bodies:
337
                visual_xyz_coords = [*visual_xyz_coords, *self.sim.data.get_body_xpos(musculo_body)]
338
339
            visual_xyz_coords = sensory_feedback_specs.process_visual_position(visual_xyz_coords)
340
            sensory_feedback = [*sensory_feedback, *visual_xyz_coords]
341
342
        if len(self.sfs_visual_distance_bodies) != 0:
343
            visual_xyz_distance = []
344
            for musculo_tuple in self.sfs_visual_distance_bodies:
345
                body0_xyz = self.sim.data.get_body_xpos(musculo_tuple[0])
346
                body1_xyz = self.sim.data.get_body_xpos(musculo_tuple[1])
347
                tuple_dist = np.abs(body0_xyz - body1_xyz).tolist()
348
                visual_xyz_distance = [*visual_xyz_distance, *tuple_dist]
349
350
            #process
351
            visual_xyz_distance = sensory_feedback_specs.process_visual_distance(visual_xyz_distance)
352
            sensory_feedback = [*sensory_feedback, *visual_xyz_distance]
353
354
        #Save the xpos of the musculo bodies for visual vels
355
        if len(self.sfs_visual_velocity) != 0:
356
357
            #Find the visual vels after the simulation
358
            current_body_xpos = []
359
            for musculo_body in self.sfs_visual_velocity:
360
                body_xpos = self.sim.data.get_body_xpos(musculo_body)
361
                current_body_xpos = [*current_body_xpos, *body_xpos]
362
363
            #Find the velocity
364
            visual_vels = (np.abs(np.array(self.prev_body_xpos) - np.array(current_body_xpos)) / self.dt).tolist()
365
            
366
            #process visual velocity feedback
367
            visual_vels = sensory_feedback_specs.process_visual_velocity(visual_vels)
368
            sensory_feedback= [*sensory_feedback, *visual_vels]
369
370
            #update the self.prev_body_xpos
371
            self.prev_body_xpos = current_body_xpos
372
373
        return np.array(sensory_feedback)