[f9c9f2]: / submission / submit_env.py

Download this file

106 lines (87 with data), 3.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
import gym
from gym.spaces import Box
class SubmitEnv:
def __init__(self):
from osim.http.client import Client
remote_base = "http://grader.crowdai.org:1729"
self.crowdai_token = "e47cb9f7fd533dc036dbd5d65d0d68c3"
self.client = Client(remote_base)
self.first_reset = True
self.action_space = Box(low=0, high=1, shape=[19])
self.observation_space = Box(low=-3, high=3, shape=[224])
self.episodic_length = 0
self.score = 0.0
self.reward_range = None
self.metadata = None
def reset(self):
self.episodic_length = 0
self.score = 0
if self.first_reset:
self.first_reset = False
return self.get_observation(self.client.env_create(self.crowdai_token, env_id="ProstheticsEnv"))
else:
obs = self.client.env_reset()
if obs is None:
self.client.submit()
print('SUBMITTED')
import sys
sys.exit(0)
return self.get_observation(obs)
def step(self, action):
[obs, rew, done, info] = self.client.env_step(action.tolist(), True)
self.episodic_length += 1
self.score += rew
pelvis_vx = obs['body_vel']['pelvis'][0]
print(f'timestamp={self.episodic_length:3d} score={self.score:5.2f} velocity={pelvis_vx:3.2f}')
import sys
sys.stdout.flush()
return self.get_observation(obs), rew, done, info
def close(self):
pass
def get_observation(self, state_desc):
res = []
pelvis = None
for body_part in ["pelvis", "head", "torso", "toes_l", "talus_l", "pros_foot_r", "pros_tibia_r"]:
cur = []
cur += state_desc["body_pos"][body_part]
cur += state_desc["body_vel"][body_part]
cur += state_desc["body_acc"][body_part]
cur += state_desc["body_pos_rot"][body_part]
cur += state_desc["body_vel_rot"][body_part]
cur += state_desc["body_acc_rot"][body_part]
if body_part == "pelvis":
pelvis = cur
res += cur[1:] # make sense, pelvis.x is not important
else:
cur[0] -= pelvis[0]
cur[2] -= pelvis[2] # relative position work for x / z axis
res += cur
for joint in ["ankle_l", "ankle_r", "back", "hip_l", "hip_r", "knee_l", "knee_r"]:
res += state_desc["joint_pos"][joint]
res += state_desc["joint_vel"][joint]
res += state_desc["joint_acc"][joint]
for muscle in sorted(state_desc["muscles"].keys()):
res += [state_desc["muscles"][muscle]["activation"]]
res += [state_desc["muscles"][muscle]["fiber_length"]]
res += [state_desc["muscles"][muscle]["fiber_velocity"]]
cm_pos = state_desc["misc"]["mass_center_pos"] # relative x / z axis center of mass position
cm_pos[0] -= pelvis[0]
cm_pos[2] -= pelvis[0]
res = res + cm_pos + state_desc["misc"]["mass_center_vel"] + state_desc["misc"]["mass_center_acc"]
return res
class SubmitRepeatActionEnv(gym.ActionWrapper):
def __init__(self, env, repeat=2):
super(SubmitRepeatActionEnv, self).__init__(env)
self._repeat = repeat
def step(self, action):
total_reward = 0.0
for _ in range(self._repeat):
obs, reward, done, info = self.env.step(action)
total_reward += reward
if done:
break
return obs, total_reward, done, info
def make_submit_env():
env = SubmitEnv()
env = SubmitRepeatActionEnv(env=env, repeat=2)
return env