Diff of /osim/redis/client.py [000000] .. [077a87]

Switch to side-by-side view

--- a
+++ b/osim/redis/client.py
@@ -0,0 +1,152 @@
+import redis
+import json
+import os
+import pkg_resources
+import sys
+import numpy as np
+import msgpack
+import msgpack_numpy as m
+m.patch()
+import hashlib
+import random
+from osim.redis import messages
+import time
+
+import logging
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.INFO)
+
+class Client(object):
+    """
+        Redis client to interface with osim-rl redis-service
+
+        The Docker container hosts a redis-server inside the container.
+        This client connects to the same redis-server, and communicates with the service.
+
+        The service eventually will reside outside the docker container, and will communicate
+        with the client only via the redis-server of the docker container.
+
+        On the instantiation of the docker container, one service will be instantiated parallely.
+
+        The service will accepts commands at "`service_id`::commands"
+        where `service_id` is either provided as an `env` variable or is
+        instantiated to "osim_rl_redis_service_id"
+    """
+    def __init__(self, remote_host='127.0.0.1', remote_port=6379, remote_db=0, remote_password=None, verbose=False):
+        self.redis_pool = redis.ConnectionPool(host=remote_host, port=remote_port, db=remote_db, password=remote_password)
+        self.namespace = "osim-rl"
+        try:
+            self.service_id =  os.environ['osim_rl_redis_service_id']
+        except KeyError:
+            self.service_id = "osim_rl_redis_service_id"
+        self.command_channel = "{}::{}::commands".format(self.namespace, self.service_id)
+        self.verbose = verbose
+        self.ping_pong()
+
+    def get_redis_connection(self):
+        return redis.Redis(connection_pool=self.redis_pool)
+
+    def _generate_response_channel(self):
+        random_hash = hashlib.md5("{}".format(random.randint(0, 10**10)).encode('utf-8')).hexdigest()
+        response_channel = "{}::{}::response::{}".format(   self.namespace,
+                                                            self.service_id,
+                                                            random_hash)
+        return response_channel
+
+    def _blocking_request(self, _request):
+        """
+            request:
+                -command_type
+                -payload
+                -response_channel
+            response: (on response_channel)
+                - RESULT
+            * Send the payload on command_channel (self.namespace+"::command")
+                ** redis-left-push (LPUSH)
+            * Keep listening on response_channel (BLPOP)
+        """
+        assert type(_request) ==type({})
+        _request['response_channel'] = self._generate_response_channel()
+
+        _redis = self.get_redis_connection()
+        """
+            The client always pushes in the left
+            and the service always pushes in the right
+        """
+        if self.verbose: print("Request : ", _response)
+        # Push request in command_channels
+        payload = msgpack.packb(_request, default=m.encode, use_bin_type=True)
+        _redis.lpush(self.command_channel, payload)
+        ## TODO: Check if we can use `repr` for json.dumps string serialization
+        # Wait with a blocking pop for the response
+        _response = _redis.blpop(_request['response_channel'])[1]
+        if self.verbose: print("Response : ", _response)
+        _response = msgpack.unpackb(_response, object_hook=m.decode, encoding="utf8")
+        if _response['type'] == messages.OSIM_RL.ERROR:
+            raise Exception(str(_response))
+        else:
+            return _response
+
+    def ping_pong(self):
+        """
+            Official Handshake with the grading service
+            Send a PING
+            and wait for PONG
+            If not PONG, raise error
+        """
+        _request = {}
+        _request['type'] = messages.OSIM_RL.PING
+        _request['payload'] = {}
+        _response = self._blocking_request(_request)
+        if _response['type'] != messages.OSIM_RL.PONG:
+            raise Exception("Unable to perform handshake with the redis service. Expected PONG; received {}".format(json.dumps(_response)))
+        else:
+            return True
+
+    def env_create(self):
+        _request = {}
+        _request['type'] = messages.OSIM_RL.ENV_CREATE
+        _request['payload'] = {}
+        _response = self._blocking_request(_request)
+        observation = _response['payload']['observation']
+        return observation
+
+    def env_reset(self):
+        _request = {}
+        _request['type'] = messages.OSIM_RL.ENV_RESET
+        _request['payload'] = {}
+        _response = self._blocking_request(_request)
+        observation = _response['payload']['observation']
+        return observation
+
+    def env_step(self, action, render=False):
+        """
+            Respond with [observation, reward, done, info]
+        """
+        action = np.array(action).tolist()
+        _request = {}
+        _request['type'] = messages.OSIM_RL.ENV_STEP
+        _request['payload'] = {}
+        _request['payload']['action'] = action
+        _response = self._blocking_request(_request)
+        _payload = _response['payload']
+        observation = _payload['observation']
+        reward = _payload['reward']
+        done = _payload['done']
+        info = _payload['info']
+        return [observation, reward, done, info]
+
+    def submit(self):
+        _request = {}
+        _request['type'] = messages.OSIM_RL.ENV_SUBMIT
+        _request['payload'] = {}
+        _response = self._blocking_request(_request)
+        if os.getenv("CROWDAI_BLOCKING_SUBMIT"):
+            """
+            If the submission is supposed to happen as a blocking submit,
+            then wait indefinitely for the evaluator to decide what to 
+            do with the container.
+            """
+            while True:
+                time.sleep(10)
+        return _response['payload']