import redis
import json
import os
import pkg_resources
import sys
import numpy as np
import msgpack
import msgpack_numpy as m
m.patch()
import hashlib
import random
from osim.redis import messages
import time
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class Client(object):
"""
Redis client to interface with osim-rl redis-service
The Docker container hosts a redis-server inside the container.
This client connects to the same redis-server, and communicates with the service.
The service eventually will reside outside the docker container, and will communicate
with the client only via the redis-server of the docker container.
On the instantiation of the docker container, one service will be instantiated parallely.
The service will accepts commands at "`service_id`::commands"
where `service_id` is either provided as an `env` variable or is
instantiated to "osim_rl_redis_service_id"
"""
def __init__(self, remote_host='127.0.0.1', remote_port=6379, remote_db=0, remote_password=None, verbose=False):
self.redis_pool = redis.ConnectionPool(host=remote_host, port=remote_port, db=remote_db, password=remote_password)
self.namespace = "osim-rl"
try:
self.service_id = os.environ['osim_rl_redis_service_id']
except KeyError:
self.service_id = "osim_rl_redis_service_id"
self.command_channel = "{}::{}::commands".format(self.namespace, self.service_id)
self.verbose = verbose
self.ping_pong()
def get_redis_connection(self):
return redis.Redis(connection_pool=self.redis_pool)
def _generate_response_channel(self):
random_hash = hashlib.md5("{}".format(random.randint(0, 10**10)).encode('utf-8')).hexdigest()
response_channel = "{}::{}::response::{}".format( self.namespace,
self.service_id,
random_hash)
return response_channel
def _blocking_request(self, _request):
"""
request:
-command_type
-payload
-response_channel
response: (on response_channel)
- RESULT
* Send the payload on command_channel (self.namespace+"::command")
** redis-left-push (LPUSH)
* Keep listening on response_channel (BLPOP)
"""
assert type(_request) ==type({})
_request['response_channel'] = self._generate_response_channel()
_redis = self.get_redis_connection()
"""
The client always pushes in the left
and the service always pushes in the right
"""
if self.verbose: print("Request : ", _response)
# Push request in command_channels
payload = msgpack.packb(_request, default=m.encode, use_bin_type=True)
_redis.lpush(self.command_channel, payload)
## TODO: Check if we can use `repr` for json.dumps string serialization
# Wait with a blocking pop for the response
_response = _redis.blpop(_request['response_channel'])[1]
if self.verbose: print("Response : ", _response)
_response = msgpack.unpackb(_response, object_hook=m.decode, encoding="utf8")
if _response['type'] == messages.OSIM_RL.ERROR:
raise Exception(str(_response))
else:
return _response
def ping_pong(self):
"""
Official Handshake with the grading service
Send a PING
and wait for PONG
If not PONG, raise error
"""
_request = {}
_request['type'] = messages.OSIM_RL.PING
_request['payload'] = {}
_response = self._blocking_request(_request)
if _response['type'] != messages.OSIM_RL.PONG:
raise Exception("Unable to perform handshake with the redis service. Expected PONG; received {}".format(json.dumps(_response)))
else:
return True
def env_create(self):
_request = {}
_request['type'] = messages.OSIM_RL.ENV_CREATE
_request['payload'] = {}
_response = self._blocking_request(_request)
observation = _response['payload']['observation']
return observation
def env_reset(self):
_request = {}
_request['type'] = messages.OSIM_RL.ENV_RESET
_request['payload'] = {}
_response = self._blocking_request(_request)
observation = _response['payload']['observation']
return observation
def env_step(self, action, render=False):
"""
Respond with [observation, reward, done, info]
"""
action = np.array(action).tolist()
_request = {}
_request['type'] = messages.OSIM_RL.ENV_STEP
_request['payload'] = {}
_request['payload']['action'] = action
_response = self._blocking_request(_request)
_payload = _response['payload']
observation = _payload['observation']
reward = _payload['reward']
done = _payload['done']
info = _payload['info']
return [observation, reward, done, info]
def submit(self):
_request = {}
_request['type'] = messages.OSIM_RL.ENV_SUBMIT
_request['payload'] = {}
_response = self._blocking_request(_request)
if os.getenv("CROWDAI_BLOCKING_SUBMIT"):
"""
If the submission is supposed to happen as a blocking submit,
then wait indefinitely for the evaluator to decide what to
do with the container.
"""
while True:
time.sleep(10)
return _response['payload']