Diff of /osim/redis/client.py [000000] .. [077a87]

Switch to unified view

a b/osim/redis/client.py
1
import redis
2
import json
3
import os
4
import pkg_resources
5
import sys
6
import numpy as np
7
import msgpack
8
import msgpack_numpy as m
9
m.patch()
10
import hashlib
11
import random
12
from osim.redis import messages
13
import time
14
15
import logging
16
logger = logging.getLogger(__name__)
17
logger.setLevel(logging.INFO)
18
19
class Client(object):
20
    """
21
        Redis client to interface with osim-rl redis-service
22
23
        The Docker container hosts a redis-server inside the container.
24
        This client connects to the same redis-server, and communicates with the service.
25
26
        The service eventually will reside outside the docker container, and will communicate
27
        with the client only via the redis-server of the docker container.
28
29
        On the instantiation of the docker container, one service will be instantiated parallely.
30
31
        The service will accepts commands at "`service_id`::commands"
32
        where `service_id` is either provided as an `env` variable or is
33
        instantiated to "osim_rl_redis_service_id"
34
    """
35
    def __init__(self, remote_host='127.0.0.1', remote_port=6379, remote_db=0, remote_password=None, verbose=False):
36
        self.redis_pool = redis.ConnectionPool(host=remote_host, port=remote_port, db=remote_db, password=remote_password)
37
        self.namespace = "osim-rl"
38
        try:
39
            self.service_id =  os.environ['osim_rl_redis_service_id']
40
        except KeyError:
41
            self.service_id = "osim_rl_redis_service_id"
42
        self.command_channel = "{}::{}::commands".format(self.namespace, self.service_id)
43
        self.verbose = verbose
44
        self.ping_pong()
45
46
    def get_redis_connection(self):
47
        return redis.Redis(connection_pool=self.redis_pool)
48
49
    def _generate_response_channel(self):
50
        random_hash = hashlib.md5("{}".format(random.randint(0, 10**10)).encode('utf-8')).hexdigest()
51
        response_channel = "{}::{}::response::{}".format(   self.namespace,
52
                                                            self.service_id,
53
                                                            random_hash)
54
        return response_channel
55
56
    def _blocking_request(self, _request):
57
        """
58
            request:
59
                -command_type
60
                -payload
61
                -response_channel
62
            response: (on response_channel)
63
                - RESULT
64
            * Send the payload on command_channel (self.namespace+"::command")
65
                ** redis-left-push (LPUSH)
66
            * Keep listening on response_channel (BLPOP)
67
        """
68
        assert type(_request) ==type({})
69
        _request['response_channel'] = self._generate_response_channel()
70
71
        _redis = self.get_redis_connection()
72
        """
73
            The client always pushes in the left
74
            and the service always pushes in the right
75
        """
76
        if self.verbose: print("Request : ", _response)
77
        # Push request in command_channels
78
        payload = msgpack.packb(_request, default=m.encode, use_bin_type=True)
79
        _redis.lpush(self.command_channel, payload)
80
        ## TODO: Check if we can use `repr` for json.dumps string serialization
81
        # Wait with a blocking pop for the response
82
        _response = _redis.blpop(_request['response_channel'])[1]
83
        if self.verbose: print("Response : ", _response)
84
        _response = msgpack.unpackb(_response, object_hook=m.decode, encoding="utf8")
85
        if _response['type'] == messages.OSIM_RL.ERROR:
86
            raise Exception(str(_response))
87
        else:
88
            return _response
89
90
    def ping_pong(self):
91
        """
92
            Official Handshake with the grading service
93
            Send a PING
94
            and wait for PONG
95
            If not PONG, raise error
96
        """
97
        _request = {}
98
        _request['type'] = messages.OSIM_RL.PING
99
        _request['payload'] = {}
100
        _response = self._blocking_request(_request)
101
        if _response['type'] != messages.OSIM_RL.PONG:
102
            raise Exception("Unable to perform handshake with the redis service. Expected PONG; received {}".format(json.dumps(_response)))
103
        else:
104
            return True
105
106
    def env_create(self):
107
        _request = {}
108
        _request['type'] = messages.OSIM_RL.ENV_CREATE
109
        _request['payload'] = {}
110
        _response = self._blocking_request(_request)
111
        observation = _response['payload']['observation']
112
        return observation
113
114
    def env_reset(self):
115
        _request = {}
116
        _request['type'] = messages.OSIM_RL.ENV_RESET
117
        _request['payload'] = {}
118
        _response = self._blocking_request(_request)
119
        observation = _response['payload']['observation']
120
        return observation
121
122
    def env_step(self, action, render=False):
123
        """
124
            Respond with [observation, reward, done, info]
125
        """
126
        action = np.array(action).tolist()
127
        _request = {}
128
        _request['type'] = messages.OSIM_RL.ENV_STEP
129
        _request['payload'] = {}
130
        _request['payload']['action'] = action
131
        _response = self._blocking_request(_request)
132
        _payload = _response['payload']
133
        observation = _payload['observation']
134
        reward = _payload['reward']
135
        done = _payload['done']
136
        info = _payload['info']
137
        return [observation, reward, done, info]
138
139
    def submit(self):
140
        _request = {}
141
        _request['type'] = messages.OSIM_RL.ENV_SUBMIT
142
        _request['payload'] = {}
143
        _response = self._blocking_request(_request)
144
        if os.getenv("CROWDAI_BLOCKING_SUBMIT"):
145
            """
146
            If the submission is supposed to happen as a blocking submit,
147
            then wait indefinitely for the evaluator to decide what to 
148
            do with the container.
149
            """
150
            while True:
151
                time.sleep(10)
152
        return _response['payload']