--- a +++ b/mouse_scripts/Mouse_RL_Environment.py @@ -0,0 +1,295 @@ +import datetime +import gym +from gym import error, spaces +from gym.utils import seeding +import numpy as np +import itertools +import model_utils as model_utils +import pybullet as p +import pybullet_data +import yaml +import scipy.io +from pybullet_env import PyBulletEnv +import torch.nn.functional as F + +import farms_pylog as pylog +try: + from farms_muscle.musculo_skeletal_system import MusculoSkeletalSystem +except ImportError: + pylog.warning("farms-muscle not installed!") +from farms_container import Container + +sphere_file = "../files/sphere_small.urdf" + +class Mouse_Env(PyBulletEnv): + + def __init__(self, model_path, muscle_config_file, pose_file, frame_skip, ctrl, timestep, model_offset, vizualize, threshold, cost_scale, max_cycle_len): + PyBulletEnv.__init__(self, model_path, muscle_config_file, pose_file, frame_skip, ctrl, timestep, model_offset, vizualize, threshold, cost_scale) + + u = self.container.muscles.activations + self.max_cycle_len = max_cycle_len + self.muscle_params = {} + self.muscle_excitation = {} + + #####TARGET POSITION USING POINT IN SPACE: X, Y, Z##### + ###x, y, z for initializing from hand starting position, target_pos for updating + self.x_pos = [0] + self.y_pos = p.getLinkState(self.model, 115)[0][1] + self.z_pos = p.getLinkState(self.model, 115)[0][2] + + self.avg_vel = 1 + self.target_pos = [self.x_pos[0]/self.scale-self.offset, self.y_pos, self.z_pos] + + for muscle in self.muscles.muscles.keys(): + self.muscle_params[muscle] = u.get_parameter('stim_{}'.format(muscle)) + self.muscle_excitation[muscle] = p.addUserDebugParameter("flexor {}".format(muscle), 0, 1, 0.00) + self.muscle_params[muscle].value = 0 + + def reset(self, pose_file): + + self.istep = 0 + self.activations = [] + self.hand_positions = [] + + model_utils.disable_control(self.model) #disables torque/position + + self.reset_model(pose_file) #resets model position + self.container.initialize() #resets container + self.muscles.setup_integrator() #resets muscles + + #resets target position + self.target_pos = np.array([self.x_pos[0]/self.scale-self.offset, self.y_pos, self.z_pos]) + if self.use_sphere: + p.resetBasePositionAndOrientation(self.sphere, np.array(self.target_pos), p.getQuaternionFromEuler([0, 0, 80.2])) + + def reset_model(self, pose_file): + model_utils.reset_model_position(self.model, pose_file) + + def get_reward(self): + hand_pos = p.getLinkState(self.model, 115, computeForwardKinematics=True)[0] #(x, y, z) + + d_x = np.abs(hand_pos[0] - self.target_pos[0]) + d_y = np.abs(hand_pos[1] - self.target_pos[1]) + d_z = np.abs(hand_pos[2] - self.target_pos[2]) + + distances = [d_x, d_y, d_z] + + if d_x > self.threshold_x or d_y > self.threshold_y or d_z > self.threshold_z: + reward = -5 + else: + r_x= 1/(1000**d_x) + r_y= 1/(1000**d_y) + r_z= 1/(1000**d_z) + + reward= r_x + r_y + r_z + + return reward, distances + + def is_done(self): + hand_pos = np.array(p.getLinkState(self.model, 115, computeForwardKinematics=True)[0]) #(x, y, z) + criteria = hand_pos - self.target_pos + + if self.istep >= self._max_episode_steps: + return True + + if np.abs(criteria[0]) > self.threshold_x or np.abs(criteria[1]) > self.threshold_y or np.abs(criteria[2]) > self.threshold_z: + return True + + return False + + def update_target_pos(self): + self.target_pos = np.array([self.x_pos[(self.istep)]/self.scale-self.offset, self.y_pos, self.z_pos]) + + if self.use_sphere: + p.resetBasePositionAndOrientation(self.sphere, np.array(self.target_pos), p.getQuaternionFromEuler([0, 0, 80.2])) + + def get_joint_positions_and_velocities(self): + joint_positions = [] + joint_velocities = [] + for i in range(len(self.ctrl)): + joint_positions.append(p.getJointState(self.model, self.ctrl[i])[0]) + joint_velocities.append(p.getJointState(self.model, self.ctrl[i])[1]*.01) + joint_positions = [*list(np.array(joint_positions)), *list(p.getLinkState(self.model, 115, computeForwardKinematics=True)[0])] #(x, y, z) + joint_velocities = [*list(np.array(joint_velocities)), *list(p.getLinkState(self.model, 115, computeForwardKinematics=True, computeLinkVelocity=True)[6])] + return joint_positions, joint_velocities + + def get_start_state(self): + joint_positions, _ = self.get_joint_positions_and_velocities() + _, distance = self.get_reward() + #targ_vel_const = self.comp_targ_vel_const() + return [*list(np.array(self.get_activations())), *list(np.array(joint_positions)), *[0.]*10, *list(np.array(self.target_pos)), 0., *list(np.array(distance))] + + def update_state(self, act, joint_positions, joint_velocities, target_velocity, distances): + state = [*list(np.array(act)), *list(np.array(joint_positions)), *list(np.array(joint_velocities)), *list(np.array(self.target_pos)), target_velocity, *list(np.array(distances))] + return state + + def comp_targ_vel_const(self, scaling=.25): + return np.sinh(self.avg_vel*scaling)/np.cosh(self.avg_vel*scaling) + + def comp_targ_vel(self, prev_target): + return (self.target_pos - prev_target) / .001 + + def step(self, forces, timestep): + + if timestep < (self._max_episode_steps-1): + self.istep += 1 + + prev_target = self.target_pos + self.update_target_pos() + + self.controller_to_actuator(forces) + + self.do_simulation() + + act = self.get_activations() + + reward, distances = self.get_reward() + cost = self.get_cost(forces) + final_reward= 5*reward - (self.forces_scale*cost) + + done = self.is_done() + target_vel = self.comp_targ_vel(prev_target) + joint_positions, joint_velocities = self.get_joint_positions_and_velocities() + #target_vel_const = self.comp_targ_vel_const() + state = self.update_state(act, joint_positions, joint_velocities, target_vel[0], distances) + + return state, final_reward, done + + +########################## SIMULATED ENVIRONMENT ######################################### + + +class Mouse_Env_Simulated(PyBulletEnv): + + def __init__(self, model_path, muscle_config_file, pose_file, frame_skip, ctrl, timestep, model_offset, vizualize, threshold, cost_scale): + PyBulletEnv.__init__(self, model_path, muscle_config_file, pose_file, frame_skip, ctrl, timestep, model_offset, vizualize, threshold, cost_scale) + + u = self.container.muscles.activations + self.muscle_params = {} + self.muscle_excitation = {} + + self.z_offset = 6 + self.x_offset = 20 + self.start_interval = -np.pi / 2 + self.end_interval = 3 * np.pi / 2 + + #####TARGET POSITION USING POINT IN SPACE: X, Y, Z##### + ###x, y, z for initializing from hand starting position, target_pos for updating + self.x_theta = np.linspace(self.start_interval, self.end_interval, self.timestep) + self.x_pos = np.sin(self.x_theta[0]) + self.y_pos = p.getLinkState(self.model, 115)[0][1] + self.z_theta = np.linspace(self.start_interval, self.end_interval, self.timestep) + self.z_pos = np.cos(self.z_theta[0]) + + self.target_pos = [(self.x_pos + self.x_offset) / self.scale, self.y_pos, (self.z_pos + self.z_offset) / self.scale] + + for muscle in self.muscles.muscles.keys(): + self.muscle_params[muscle] = u.get_parameter('stim_{}'.format(muscle)) + self.muscle_excitation[muscle] = p.addUserDebugParameter("flexor {}".format(muscle), 0, 1, 0.00) + self.muscle_params[muscle].value = 0 + + def reset(self, pose_file): + + self.istep = 0 + model_utils.disable_control(self.model) #disables torque/position + self.reset_model(pose_file) #resets model position + self.container.initialize() #resets container + self.muscles.setup_integrator() #resets muscles + #resets target position + self.x_theta = np.linspace(self.start_interval, self.end_interval, self.timestep) + self.x_pos = np.sin(self.x_theta[0]) + + self.z_theta = np.linspace(self.start_interval, self.end_interval, self.timestep) + self.z_pos = np.cos(self.z_theta[0]) + + self.target_pos = [(self.x_pos + self.x_offset) / self.scale, self.y_pos, (self.z_pos + self.z_offset) / self.scale] + + if self.use_sphere: + p.resetBasePositionAndOrientation(self.sphere, np.array(self.target_pos), p.getQuaternionFromEuler([0, 0, 80.2])) + + def reset_model(self, pose_file): + model_utils.reset_model_position(self.model, pose_file) + + def get_reward(self): + hand_pos = p.getLinkState(self.model, 115, computeForwardKinematics=True)[0] #(x, y, z) + + d_x = np.abs(hand_pos[0] - self.target_pos[0]) + d_y = np.abs(hand_pos[1] - self.target_pos[1]) + d_z = np.abs(hand_pos[2] - self.target_pos[2]) + + distances = [d_x, d_y, d_z] + + if d_x > self.threshold_x or d_y > self.threshold_y or d_z > self.threshold_z: + reward = -5 + else: + r_x= 1/(1000**d_x) + r_y= 1/(1000**d_y) + r_z= 1/(1000**d_z) + + reward= r_x + r_y + r_z + + return reward, distances + + def is_done(self): + hand_pos = np.array(p.getLinkState(self.model, 115, computeForwardKinematics=True)[0]) #(x, y, z) + criteria = hand_pos - self.target_pos + + if self.istep >= self._max_episode_steps: + return True + + if np.abs(criteria[0]) > self.threshold_x or np.abs(criteria[1]) > self.threshold_y or np.abs(criteria[2]) > self.threshold_z: + return True + + return False + + def update_target_pos(self): + self.target_pos = [(np.sin(self.x_theta[self.istep]) + self.x_offset) / self.scale, self.y_pos, (np.cos(self.z_theta[self.istep]) + self.z_offset) / self.scale] + + if self.use_sphere: + p.resetBasePositionAndOrientation(self.sphere, np.array(self.target_pos), p.getQuaternionFromEuler([0, 0, 80.2])) + + def get_joint_positions_and_velocities(self): + joint_positions = [] + joint_velocities = [] + for i in range(len(self.ctrl)): + joint_positions.append(p.getJointState(self.model, self.ctrl[i])[0]) + joint_velocities.append(p.getJointState(self.model, self.ctrl[i])[1]/100) + joint_positions = [*list(np.array(joint_positions)), *list(p.getLinkState(self.model, 115, computeForwardKinematics=True)[0])] #(x, y, z) + joint_velocities = [*list(np.array(joint_velocities)), *list(p.getLinkState(self.model, 115, computeForwardKinematics=True, computeLinkVelocity=True)[6])] + return joint_positions, joint_velocities + + def update_state(self, act, joint_positions, joint_velocities, target_pos, target_velocity, distances): + state = [*list(np.array(act)), *list(np.array(joint_positions)), *list(np.array(joint_velocities)), *list(np.array(target_pos)), *list(np.array(target_velocity)), *list(np.array(distances))] + return state + + def get_cur_state(self): + + joint_positions, _ = self.get_joint_positions_and_velocities() + _, distance = self.get_reward() + return [*list(np.array(self.get_activations())), *list(np.array(joint_positions)), *[0.]*10, *list(np.array(self.target_pos)), *[0.]*3, *list(np.array(distance))] + + def step(self, forces, timestep): + + if timestep < (self._max_episode_steps-1): + self.istep += 1 + + prev_target = self.target_pos + self.update_target_pos() + curr_target = self.target_pos + + self.controller_to_actuator(forces) + + self.do_simulation() + + act = self.get_activations() + + reward, distances = self.get_reward() + cost = self.get_cost(forces) + final_reward= 5*reward - (self.forces_scale*cost) + + done = self.is_done() + target_vel = (curr_target - prev_target) / .001 + joint_positions, joint_velocities = self.get_joint_positions_and_velocities() + state = self.update_state(act, joint_positions, joint_velocities, curr_target, target_vel[0], distances) + + return state, final_reward, done \ No newline at end of file