Diff of /nips/round2_env.py [000000] .. [f9c9f2]

Switch to side-by-side view

--- a
+++ b/nips/round2_env.py
@@ -0,0 +1,175 @@
+import random
+from osim.env import ProstheticsEnv
+import gym
+import numpy as np
+from gym.spaces import Box
+
+OBSERVATION_SPACE = 224
+
+
+class CustomEnv(ProstheticsEnv):
+    def __init__(self, visualization=True, integrator_accuracy=5e-5):
+        # difficulty = 1 for round 2 environment
+        super().__init__(visualization, integrator_accuracy, difficulty=1)
+        self.episode_length = 0
+        self.episode_original_reward = 0.0
+        self.episode_shaped_reward = 0.0
+        self.episode_activation_penalty = 0.0
+        self.episode_vx_penalty = 0.0
+        self.episode_vz_penalty = 0.0
+        self.observation_space = Box(low=-10, high=+10, shape=[OBSERVATION_SPACE])
+
+        # random
+        random.seed()
+        self.random_seed = random.randint(0, 2 ** 32 - 1)
+
+    def step(self, action, project=True):
+        obs, r, done, info = super(CustomEnv, self).step(np.clip(np.array(action), 0.0, 1.0))
+        self.episode_length += 1
+
+        # early termination penalty
+        if done and self.episode_length < self.time_limit:
+            r -= 2
+
+        original_reward = super(CustomEnv, self).reward()
+        self.episode_original_reward += original_reward
+        self.episode_shaped_reward += r
+
+        state_desc = self.get_state_desc()
+
+        # activation penalty
+        self.episode_activation_penalty += np.sum(np.array(self.osim_model.get_activations()) ** 2) * 0.001
+        # velocity matching penalty on X, Z direction
+        self.episode_vx_penalty += (state_desc["body_vel"]["pelvis"][0] - state_desc["target_vel"][0]) ** 2
+        self.episode_vz_penalty += (state_desc["body_vel"]["pelvis"][2] - state_desc["target_vel"][2]) ** 2
+
+        if done:
+            info['episode'] = {
+                'r': self.episode_original_reward,
+                'l': self.episode_length,
+                "shaped_reward": self.episode_shaped_reward,
+                "activation_penalty": self.episode_activation_penalty,
+                "vx_penalty": self.episode_vx_penalty,
+                "vz_penalty": self.episode_vz_penalty
+            }
+
+        return obs, r, done, info
+
+    def reset(self, project=True):
+        super().reset(project=project, seed=self.random_seed)
+        random.seed(self.random_seed)
+        self.random_seed = random.randint(0, 2 ** 32 - 1)
+        self.episode_length = 0
+        self.episode_original_reward = 0.0
+        self.episode_shaped_reward = 0.0
+        self.episode_activation_penalty = 0.0
+        self.episode_vx_penalty = 0.0
+        self.episode_vz_penalty = 0.0
+        obs = self.get_observation()
+        return obs
+
+    def get_observation_space_size(self):
+        return OBSERVATION_SPACE
+
+    def get_observation(self):
+        state_desc = self.get_state_desc()
+
+        res = []
+        pelvis = None
+
+        for body_part in ["pelvis", "head", "torso", "toes_l", "talus_l", "pros_foot_r", "pros_tibia_r"]:
+            cur = []
+            cur += state_desc["body_pos"][body_part]
+            cur += state_desc["body_vel"][body_part]
+            cur += state_desc["body_acc"][body_part]
+            cur += state_desc["body_pos_rot"][body_part]
+            cur += state_desc["body_vel_rot"][body_part]
+            cur += state_desc["body_acc_rot"][body_part]
+            if body_part == "pelvis":
+                pelvis = cur
+                res += cur[1:]  # make sense, pelvis.x is not important
+            else:
+                cur[0] -= pelvis[0]
+                cur[2] -= pelvis[2]     # relative position work for x / z axis
+                res += cur
+
+        for joint in ["ankle_l", "ankle_r", "back", "hip_l", "hip_r", "knee_l", "knee_r"]:
+            res += state_desc["joint_pos"][joint]
+            res += state_desc["joint_vel"][joint]
+            res += state_desc["joint_acc"][joint]
+
+        for muscle in sorted(state_desc["muscles"].keys()):
+            res += [state_desc["muscles"][muscle]["activation"]]
+            res += [state_desc["muscles"][muscle]["fiber_length"]]
+            res += [state_desc["muscles"][muscle]["fiber_velocity"]]
+
+        cm_pos = state_desc["misc"]["mass_center_pos"]  # relative x / z axis center of mass position
+        cm_pos[0] -= pelvis[0]
+        cm_pos[2] -= pelvis[0]
+        res = res + cm_pos
+
+        # information about target velocity
+        target_vx, target_vz = state_desc["target_vel"][0], state_desc["target_vel"][2]
+        current_vx, current_vz = state_desc["body_vel"]["pelvis"][0], state_desc["body_vel"]["pelvis"][2]
+        diff_vx, diff_vz = current_vx - target_vx, current_vz - target_vz
+        if diff_vx > 0.3:
+            diff_vx, target_vx = 0.3, current_vx - 0.3
+        elif diff_vx < -0.3:
+            diff_vx, target_vx = -0.3, current_vx + 0.3
+
+        if diff_vz > 0.15:
+            diff_vz, target_vz = 0.15, current_vz - 0.15
+        elif diff_vz < -0.15:
+            diff_vz, target_vz = -0.15, current_vz + 0.15
+
+        res = res + [diff_vz, target_vx, diff_vx, diff_vx, target_vz, diff_vz]
+
+        return res
+
+    def reward(self):
+        state_desc = self.get_state_desc()
+        prev_state_desc = self.get_prev_state_desc()
+        if not prev_state_desc:
+            return 0
+
+        target_vx, target_vz = state_desc["target_vel"][0], state_desc["target_vel"][2]
+        current_vx, current_vz = state_desc["body_vel"]["pelvis"][0], state_desc["body_vel"]["pelvis"][2]
+        pelvis_y = state_desc["body_pos"]["pelvis"][1]
+
+        reward_x = np.exp(-abs(target_vx - current_vx))
+        reward_z = np.exp(-abs(target_vz - current_vz))
+        reward = reward_x + reward_z
+
+        penalty = 0.0
+        # too low pelvis
+        low_pelvis = max(0, 0.7 - pelvis_y)
+        penalty += low_pelvis * 20
+        # activation penalty
+        penalty += np.sum(np.array(self.osim_model.get_activations()) ** 2) * 0.001
+        # velocity matching penalty on X, Z direction
+        penalty += abs(current_vx - target_vx) * 2
+        penalty += abs(current_vz - target_vz) * 2
+
+        reward -= penalty
+
+        return reward * 0.5
+
+
+class CustomActionWrapper(gym.ActionWrapper):
+    def __init__(self, env, action_repeat):
+        super(CustomActionWrapper, self).__init__(env)
+        self.action_repeat = action_repeat
+
+    def step(self, action):
+        action = self.action(action)
+        rew = 0
+        for i in range(self.action_repeat):
+            obs, r, done, info = self.env.step(action)
+            rew += r
+            if done:
+                break
+        info["action"] = action
+        return obs, rew, done, info
+
+    def action(self, action):
+        return action