[077a87]: / osim / redis / client.py

Download this file

153 lines (137 with data), 5.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import redis
import json
import os
import pkg_resources
import sys
import numpy as np
import msgpack
import msgpack_numpy as m
m.patch()
import hashlib
import random
from osim.redis import messages
import time
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class Client(object):
"""
Redis client to interface with osim-rl redis-service
The Docker container hosts a redis-server inside the container.
This client connects to the same redis-server, and communicates with the service.
The service eventually will reside outside the docker container, and will communicate
with the client only via the redis-server of the docker container.
On the instantiation of the docker container, one service will be instantiated parallely.
The service will accepts commands at "`service_id`::commands"
where `service_id` is either provided as an `env` variable or is
instantiated to "osim_rl_redis_service_id"
"""
def __init__(self, remote_host='127.0.0.1', remote_port=6379, remote_db=0, remote_password=None, verbose=False):
self.redis_pool = redis.ConnectionPool(host=remote_host, port=remote_port, db=remote_db, password=remote_password)
self.namespace = "osim-rl"
try:
self.service_id = os.environ['osim_rl_redis_service_id']
except KeyError:
self.service_id = "osim_rl_redis_service_id"
self.command_channel = "{}::{}::commands".format(self.namespace, self.service_id)
self.verbose = verbose
self.ping_pong()
def get_redis_connection(self):
return redis.Redis(connection_pool=self.redis_pool)
def _generate_response_channel(self):
random_hash = hashlib.md5("{}".format(random.randint(0, 10**10)).encode('utf-8')).hexdigest()
response_channel = "{}::{}::response::{}".format( self.namespace,
self.service_id,
random_hash)
return response_channel
def _blocking_request(self, _request):
"""
request:
-command_type
-payload
-response_channel
response: (on response_channel)
- RESULT
* Send the payload on command_channel (self.namespace+"::command")
** redis-left-push (LPUSH)
* Keep listening on response_channel (BLPOP)
"""
assert type(_request) ==type({})
_request['response_channel'] = self._generate_response_channel()
_redis = self.get_redis_connection()
"""
The client always pushes in the left
and the service always pushes in the right
"""
if self.verbose: print("Request : ", _response)
# Push request in command_channels
payload = msgpack.packb(_request, default=m.encode, use_bin_type=True)
_redis.lpush(self.command_channel, payload)
## TODO: Check if we can use `repr` for json.dumps string serialization
# Wait with a blocking pop for the response
_response = _redis.blpop(_request['response_channel'])[1]
if self.verbose: print("Response : ", _response)
_response = msgpack.unpackb(_response, object_hook=m.decode, encoding="utf8")
if _response['type'] == messages.OSIM_RL.ERROR:
raise Exception(str(_response))
else:
return _response
def ping_pong(self):
"""
Official Handshake with the grading service
Send a PING
and wait for PONG
If not PONG, raise error
"""
_request = {}
_request['type'] = messages.OSIM_RL.PING
_request['payload'] = {}
_response = self._blocking_request(_request)
if _response['type'] != messages.OSIM_RL.PONG:
raise Exception("Unable to perform handshake with the redis service. Expected PONG; received {}".format(json.dumps(_response)))
else:
return True
def env_create(self):
_request = {}
_request['type'] = messages.OSIM_RL.ENV_CREATE
_request['payload'] = {}
_response = self._blocking_request(_request)
observation = _response['payload']['observation']
return observation
def env_reset(self):
_request = {}
_request['type'] = messages.OSIM_RL.ENV_RESET
_request['payload'] = {}
_response = self._blocking_request(_request)
observation = _response['payload']['observation']
return observation
def env_step(self, action, render=False):
"""
Respond with [observation, reward, done, info]
"""
action = np.array(action).tolist()
_request = {}
_request['type'] = messages.OSIM_RL.ENV_STEP
_request['payload'] = {}
_request['payload']['action'] = action
_response = self._blocking_request(_request)
_payload = _response['payload']
observation = _payload['observation']
reward = _payload['reward']
done = _payload['done']
info = _payload['info']
return [observation, reward, done, info]
def submit(self):
_request = {}
_request['type'] = messages.OSIM_RL.ENV_SUBMIT
_request['payload'] = {}
_response = self._blocking_request(_request)
if os.getenv("CROWDAI_BLOCKING_SUBMIT"):
"""
If the submission is supposed to happen as a blocking submit,
then wait indefinitely for the evaluator to decide what to
do with the container.
"""
while True:
time.sleep(10)
return _response['payload']