#!/usr/bin/env python
from __future__ import print_function
import redis
from osim.redis import messages
import json
import numpy as np
import msgpack
import msgpack_numpy as m
m.patch()
import osim
from osim.env import *
import os
import timeout_decorator
import time
########################################################
# CONSTANTS
########################################################
PER_STEP_TIMEOUT = 20*60 # 20minutes
class OsimRlRedisService:
def __init__( self,
osim_rl_redis_service_id = 'osim_rl_redis_service_id',
seed_map = False,
max_steps = 1000,
remote_host = '127.0.0.1',
remote_port = 6379,
remote_db = 0,
remote_password = None,
difficulty = 1,
max_obstacles = 10,
visualize = False,
report = None,
verbose = False):
"""
TODO: Expose more RunEnv related variables
"""
print("Attempting to connect to redis server at {}:{}/{}".format(remote_host, remote_port, remote_db))
self.remote_host = remote_host
self.remote_port = remote_port
self.remote_db = remote_db
self.remote_password = remote_password
self.redis_pool = redis.ConnectionPool(host=remote_host, port=remote_port, db=remote_db, password=remote_password)
self.namespace = "osim-rl"
self.service_id = osim_rl_redis_service_id
self.command_channel = "{}::{}::commands".format(self.namespace, self.service_id)
self.env = False
self.env_available = False
self.reward = 0
self.simulation_count = 0
self.simualation_rewards = []
self.simulation_times = []
self.begin_simulation = False
self.current_step = 0
self.difficulty = difficulty
self.max_obstacles = max_obstacles
self.verbose = verbose
self.visualize = visualize
self.report = report
self.max_steps = max_steps
self.initalize_seed_map(seed_map)
def initalize_seed_map(self, seed_map_string):
if seed_map_string:
assert type(seed_map_string) == type("")
seed_map = seed_map_string.split(",")
seed_map = [int(x) for x in seed_map]
self.seed_map = seed_map
else:
self.seed_map = [np.random.randint(0,10**10)]
def get_redis_connection(self):
redis_conn = redis.Redis(connection_pool=self.redis_pool)
try:
redis_conn.ping()
except:
raise Exception(
"Unable to connect to redis server at {}:{} ."
"Are you sure there is a redis-server running at the "
"specified location ?".format(
self.remote_host,
self.remote_port
)
)
return redis_conn
def _error_template(self, payload):
_response = {}
_response['type'] = messages.OSIM_RL.ERROR
_response['payload'] = payload
return _response
@timeout_decorator.timeout(PER_STEP_TIMEOUT)# timeout for each command
def get_next_command(self, _redis):
command = _redis.brpop(self.command_channel)[1]
return command
def run(self):
print("Listening for commands at : ", self.command_channel)
while True:
try:
_redis = self.get_redis_connection()
command = self.get_next_command(_redis)
print("Command Service: ", command)
except timeout_decorator.timeout_decorator.TimeoutError:
raise Exception("Timeout in step {} of simulation {}".format(self.current_step, self.simulation_count))
command_response_channel = "default_response_channel"
if self.verbose: print("Self.Reward : ", self.reward)
if self.verbose: print("Current Simulation : ", self.simulation_count)
if self.seed_map and self.verbose and self.simulation_count < len(self.seed_map): print("Current SEED : ", self.seed_map[self.simulation_count])
try:
# command = json.loads(command.decode('utf-8'))
command = msgpack.unpackb(command, object_hook=m.decode, encoding="utf8")
if self.verbose: print("Received Request : ", command)
command_response_channel = command['response_channel']
if command['type'] == messages.OSIM_RL.PING:
"""
INITIAL HANDSHAKE : Respond with PONG
"""
_command_response = {}
_command_response['type'] = messages.OSIM_RL.PONG
_command_response['payload'] = {}
if self.verbose: print("Responding with : ", _command_response)
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
elif command['type'] == messages.OSIM_RL.ENV_CREATE:
"""
ENV_CREATE
Respond with initial observation
"""
_payload = command['payload']
if self.env: #If env already exists, throw an error
_error_message = "Attempt to create environment when one already exists."
if self.verbose: print("Responding with : ", self._error_template(_error_message))
_redis.rpush( command_response_channel, self._error_template(_error_message))
return self._error_template(_error_message)
else:
self.env = L2M2019Env( visualize = self.visualize,
difficulty = self.difficulty,
seed = self.seed_map[self.simulation_count],
report = self.report
)
_observation = self.env.reset(project=False)
self.begin_simulation = time.time()
self.simualation_rewards.append(0)
self.env_available = True
self.current_step = 0
#_observation = np.array(_observation).tolist()
if self.report:
"""
In case of reporting mode, truncate to the first
41 observations.
(The rest are extra activations which are used only for reporting
and should not be available to the agent)
"""
#_observation = _observation[:41]
pass
_command_response = {}
_command_response['type'] = messages.OSIM_RL.ENV_CREATE_RESPONSE
_command_response['payload'] = {}
_command_response['payload']['observation'] = _observation
if self.verbose: print("Responding with : ", _command_response)
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
elif command['type'] == messages.OSIM_RL.ENV_RESET:
"""
ENV_RESET
Respond with observation from next simulation or
False if no simulations are left
"""
self.simulation_count += 1
if self.begin_simulation:
self.simulation_times.append(time.time()-self.begin_simulation)
self.begin_simulation = time.time()
if self.seed_map and self.simulation_count < len(self.seed_map):
_observation = self.env.reset(seed=self.seed_map[self.simulation_count], project=False)
self.simualation_rewards.append(0)
self.env_available = True
self.current_step = 0
#_observation = list(_observation)
if self.report:
"""
In case of reporting mode, truncate to the first
41 observations.
(The rest are extra activations which are used only for reporting
and should not be available to the agent)
"""
#_observation = _observation[:41]
pass
_command_response = {}
_command_response['type'] = messages.OSIM_RL.ENV_RESET_RESPONSE
_command_response['payload'] = {}
_command_response['payload']['observation'] = _observation
if self.verbose: print("Responding with : ", _command_response)
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
else:
_command_response = {}
_command_response['type'] = messages.OSIM_RL.ENV_RESET_RESPONSE
_command_response['payload'] = {}
_command_response['payload']['observation'] = False
if self.verbose: print("Responding with : ", _command_response)
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
elif command['type'] == messages.OSIM_RL.ENV_STEP:
"""
ENV_STEP
Request : Action array
Respond with updated [observation,reward,done,info] after step
"""
args = command['payload']
action = args['action']
action = np.array(action)
if self.env and self.env_available:
[_observation, reward, done, info] = self.env.step(action, project=False)
else:
if self.env:
raise Exception("Attempt to call `step` function after max_steps={} in a single simulation. Please reset your environment before calling the `step` function after max_step s".format(self.max_steps))
else:
raise Exception("Attempt to call `step` function on a non existent `env`")
self.reward += reward
self.simualation_rewards[-1] += reward
self.current_step += 1
#_observation = np.array(_observation).tolist()
if self.report:
"""
In case of reporting mode, truncate to the first
41 observations.
(The rest are extra activations which are used only for reporting
and should not be available to the agent)
"""
#_observation = _observation[:41]
pass
if self.current_step >= self.max_steps:
_command_response = {}
_command_response['type'] = messages.OSIM_RL.ENV_STEP_RESPONSE
_command_response['payload'] = {}
_command_response['payload']['observation'] = _observation
_command_response['payload']['reward'] = reward
_command_response['payload']['done'] = True
_command_response['payload']['info'] = info
"""
Mark env as unavailable until next reset
"""
self.env_available = False
else:
_command_response = {}
_command_response['type'] = messages.OSIM_RL.ENV_STEP_RESPONSE
_command_response['payload'] = {}
_command_response['payload']['observation'] = _observation
_command_response['payload']['reward'] = reward
_command_response['payload']['done'] = done
_command_response['payload']['info'] = info
if done:
"""
Mark env as unavailable until next reset
"""
self.env_available = False
if self.verbose: print("Responding with : ", _command_response)
if self.verbose: print("Current Step : ", self.current_step)
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
elif command['type'] == messages.OSIM_RL.ENV_SUBMIT:
"""
ENV_SUBMIT
Submit the final cumulative reward
"""
_response = {}
_response['type'] = messages.OSIM_RL.ENV_SUBMIT_RESPONSE
_payload = {}
_payload['mean_reward'] = np.float(self.reward)/len(self.seed_map) #Mean reward
_payload['simulation_rewards'] = self.simualation_rewards
_payload['simulation_times'] = self.simulation_times
_response['payload'] = _payload
_redis.rpush(command_response_channel, msgpack.packb(_response, default=m.encode, use_bin_type=True))
elif command['type'] == messages.OSIM_RL.ENV_SUBMIT:
if self.verbose: print("Responding with : ", _response)
return _response
else:
_error = self._error_template(
"UNKNOWN_REQUEST:{}".format(
str(command)))
if self.verbose:print("Responding with : ", _error)
_redis.rpush(command_response_channel, msgpack.packb(_error, default=m.encode, use_bin_type=True))
return _error
except Exception as e:
print("Error : ", str(e))
_redis.rpush( command_response_channel,
msgpack.packb(self._error_template(str(e)), default=m.encode, use_bin_type=True))
return self._error_template(str(e))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Submit the result to AIcrowd')
parser.add_argument('--port', dest='port', action='store', required=True)
parser.add_argument('--service_id', dest='service_id', default='osim_rl_redis_service_id', required=False)
parser.add_argument('--seed_map',
dest='seed_map',
default="11,22,33",
help="comma separated list of seed values",
required=False)
args = parser.parse_args()
seed_map = args.seed_map
print("Seeds : ", seed_map.split(","))
grader = OsimRlRedisService(osim_rl_redis_service_id=args.service_id, remote_port=int(args.port), seed_map=seed_map, max_steps=1000, verbose=True)
result = grader.run()
if result['type'] == messages.OSIM_RL.ENV_SUBMIT_RESPONSE:
cumulative_results = result['payload']
print("Results : ", cumulative_results)
elif result['type'] == messages.OSIM_RL.ERROR:
error = result['payload']
raise Exception("Evaluation Failed : {}".format(str(error)))
else:
#Evaluation failed
print("Evaluation Failed : ", result['payload'])