|
a |
|
b/osim/redis/service.py |
|
|
1 |
#!/usr/bin/env python |
|
|
2 |
from __future__ import print_function |
|
|
3 |
import redis |
|
|
4 |
from osim.redis import messages |
|
|
5 |
import json |
|
|
6 |
import numpy as np |
|
|
7 |
import msgpack |
|
|
8 |
import msgpack_numpy as m |
|
|
9 |
m.patch() |
|
|
10 |
import osim |
|
|
11 |
from osim.env import * |
|
|
12 |
import os |
|
|
13 |
import timeout_decorator |
|
|
14 |
import time |
|
|
15 |
|
|
|
16 |
######################################################## |
|
|
17 |
# CONSTANTS |
|
|
18 |
######################################################## |
|
|
19 |
PER_STEP_TIMEOUT = 20*60 # 20minutes |
|
|
20 |
|
|
|
21 |
|
|
|
22 |
class OsimRlRedisService: |
|
|
23 |
def __init__( self, |
|
|
24 |
osim_rl_redis_service_id = 'osim_rl_redis_service_id', |
|
|
25 |
seed_map = False, |
|
|
26 |
max_steps = 1000, |
|
|
27 |
remote_host = '127.0.0.1', |
|
|
28 |
remote_port = 6379, |
|
|
29 |
remote_db = 0, |
|
|
30 |
remote_password = None, |
|
|
31 |
difficulty = 1, |
|
|
32 |
max_obstacles = 10, |
|
|
33 |
visualize = False, |
|
|
34 |
report = None, |
|
|
35 |
verbose = False): |
|
|
36 |
""" |
|
|
37 |
TODO: Expose more RunEnv related variables |
|
|
38 |
""" |
|
|
39 |
print("Attempting to connect to redis server at {}:{}/{}".format(remote_host, remote_port, remote_db)) |
|
|
40 |
self.remote_host = remote_host |
|
|
41 |
self.remote_port = remote_port |
|
|
42 |
self.remote_db = remote_db |
|
|
43 |
self.remote_password = remote_password |
|
|
44 |
|
|
|
45 |
self.redis_pool = redis.ConnectionPool(host=remote_host, port=remote_port, db=remote_db, password=remote_password) |
|
|
46 |
self.namespace = "osim-rl" |
|
|
47 |
self.service_id = osim_rl_redis_service_id |
|
|
48 |
self.command_channel = "{}::{}::commands".format(self.namespace, self.service_id) |
|
|
49 |
self.env = False |
|
|
50 |
self.env_available = False |
|
|
51 |
self.reward = 0 |
|
|
52 |
self.simulation_count = 0 |
|
|
53 |
self.simualation_rewards = [] |
|
|
54 |
self.simulation_times = [] |
|
|
55 |
self.begin_simulation = False |
|
|
56 |
self.current_step = 0 |
|
|
57 |
self.difficulty = difficulty |
|
|
58 |
self.max_obstacles = max_obstacles |
|
|
59 |
self.verbose = verbose |
|
|
60 |
self.visualize = visualize |
|
|
61 |
self.report = report |
|
|
62 |
self.max_steps = max_steps |
|
|
63 |
self.initalize_seed_map(seed_map) |
|
|
64 |
|
|
|
65 |
def initalize_seed_map(self, seed_map_string): |
|
|
66 |
if seed_map_string: |
|
|
67 |
assert type(seed_map_string) == type("") |
|
|
68 |
seed_map = seed_map_string.split(",") |
|
|
69 |
seed_map = [int(x) for x in seed_map] |
|
|
70 |
self.seed_map = seed_map |
|
|
71 |
else: |
|
|
72 |
self.seed_map = [np.random.randint(0,10**10)] |
|
|
73 |
|
|
|
74 |
def get_redis_connection(self): |
|
|
75 |
redis_conn = redis.Redis(connection_pool=self.redis_pool) |
|
|
76 |
try: |
|
|
77 |
redis_conn.ping() |
|
|
78 |
except: |
|
|
79 |
raise Exception( |
|
|
80 |
"Unable to connect to redis server at {}:{} ." |
|
|
81 |
"Are you sure there is a redis-server running at the " |
|
|
82 |
"specified location ?".format( |
|
|
83 |
self.remote_host, |
|
|
84 |
self.remote_port |
|
|
85 |
) |
|
|
86 |
) |
|
|
87 |
return redis_conn |
|
|
88 |
|
|
|
89 |
def _error_template(self, payload): |
|
|
90 |
_response = {} |
|
|
91 |
_response['type'] = messages.OSIM_RL.ERROR |
|
|
92 |
_response['payload'] = payload |
|
|
93 |
return _response |
|
|
94 |
|
|
|
95 |
@timeout_decorator.timeout(PER_STEP_TIMEOUT)# timeout for each command |
|
|
96 |
def get_next_command(self, _redis): |
|
|
97 |
command = _redis.brpop(self.command_channel)[1] |
|
|
98 |
return command |
|
|
99 |
|
|
|
100 |
def run(self): |
|
|
101 |
print("Listening for commands at : ", self.command_channel) |
|
|
102 |
while True: |
|
|
103 |
try: |
|
|
104 |
_redis = self.get_redis_connection() |
|
|
105 |
command = self.get_next_command(_redis) |
|
|
106 |
print("Command Service: ", command) |
|
|
107 |
except timeout_decorator.timeout_decorator.TimeoutError: |
|
|
108 |
raise Exception("Timeout in step {} of simulation {}".format(self.current_step, self.simulation_count)) |
|
|
109 |
command_response_channel = "default_response_channel" |
|
|
110 |
if self.verbose: print("Self.Reward : ", self.reward) |
|
|
111 |
if self.verbose: print("Current Simulation : ", self.simulation_count) |
|
|
112 |
if self.seed_map and self.verbose and self.simulation_count < len(self.seed_map): print("Current SEED : ", self.seed_map[self.simulation_count]) |
|
|
113 |
try: |
|
|
114 |
# command = json.loads(command.decode('utf-8')) |
|
|
115 |
command = msgpack.unpackb(command, object_hook=m.decode, encoding="utf8") |
|
|
116 |
if self.verbose: print("Received Request : ", command) |
|
|
117 |
command_response_channel = command['response_channel'] |
|
|
118 |
if command['type'] == messages.OSIM_RL.PING: |
|
|
119 |
""" |
|
|
120 |
INITIAL HANDSHAKE : Respond with PONG |
|
|
121 |
""" |
|
|
122 |
_command_response = {} |
|
|
123 |
_command_response['type'] = messages.OSIM_RL.PONG |
|
|
124 |
_command_response['payload'] = {} |
|
|
125 |
if self.verbose: print("Responding with : ", _command_response) |
|
|
126 |
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True)) |
|
|
127 |
elif command['type'] == messages.OSIM_RL.ENV_CREATE: |
|
|
128 |
""" |
|
|
129 |
ENV_CREATE |
|
|
130 |
|
|
|
131 |
Respond with initial observation |
|
|
132 |
""" |
|
|
133 |
_payload = command['payload'] |
|
|
134 |
|
|
|
135 |
if self.env: #If env already exists, throw an error |
|
|
136 |
_error_message = "Attempt to create environment when one already exists." |
|
|
137 |
if self.verbose: print("Responding with : ", self._error_template(_error_message)) |
|
|
138 |
_redis.rpush( command_response_channel, self._error_template(_error_message)) |
|
|
139 |
return self._error_template(_error_message) |
|
|
140 |
else: |
|
|
141 |
self.env = L2M2019Env( visualize = self.visualize, |
|
|
142 |
difficulty = self.difficulty, |
|
|
143 |
seed = self.seed_map[self.simulation_count], |
|
|
144 |
report = self.report |
|
|
145 |
) |
|
|
146 |
_observation = self.env.reset(project=False) |
|
|
147 |
self.begin_simulation = time.time() |
|
|
148 |
self.simualation_rewards.append(0) |
|
|
149 |
self.env_available = True |
|
|
150 |
self.current_step = 0 |
|
|
151 |
#_observation = np.array(_observation).tolist() |
|
|
152 |
if self.report: |
|
|
153 |
""" |
|
|
154 |
In case of reporting mode, truncate to the first |
|
|
155 |
41 observations. |
|
|
156 |
(The rest are extra activations which are used only for reporting |
|
|
157 |
and should not be available to the agent) |
|
|
158 |
""" |
|
|
159 |
#_observation = _observation[:41] |
|
|
160 |
pass |
|
|
161 |
|
|
|
162 |
_command_response = {} |
|
|
163 |
_command_response['type'] = messages.OSIM_RL.ENV_CREATE_RESPONSE |
|
|
164 |
_command_response['payload'] = {} |
|
|
165 |
_command_response['payload']['observation'] = _observation |
|
|
166 |
if self.verbose: print("Responding with : ", _command_response) |
|
|
167 |
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True)) |
|
|
168 |
elif command['type'] == messages.OSIM_RL.ENV_RESET: |
|
|
169 |
""" |
|
|
170 |
ENV_RESET |
|
|
171 |
|
|
|
172 |
Respond with observation from next simulation or |
|
|
173 |
False if no simulations are left |
|
|
174 |
""" |
|
|
175 |
self.simulation_count += 1 |
|
|
176 |
if self.begin_simulation: |
|
|
177 |
self.simulation_times.append(time.time()-self.begin_simulation) |
|
|
178 |
self.begin_simulation = time.time() |
|
|
179 |
if self.seed_map and self.simulation_count < len(self.seed_map): |
|
|
180 |
_observation = self.env.reset(seed=self.seed_map[self.simulation_count], project=False) |
|
|
181 |
self.simualation_rewards.append(0) |
|
|
182 |
self.env_available = True |
|
|
183 |
self.current_step = 0 |
|
|
184 |
#_observation = list(_observation) |
|
|
185 |
if self.report: |
|
|
186 |
""" |
|
|
187 |
In case of reporting mode, truncate to the first |
|
|
188 |
41 observations. |
|
|
189 |
(The rest are extra activations which are used only for reporting |
|
|
190 |
and should not be available to the agent) |
|
|
191 |
""" |
|
|
192 |
#_observation = _observation[:41] |
|
|
193 |
pass |
|
|
194 |
|
|
|
195 |
_command_response = {} |
|
|
196 |
_command_response['type'] = messages.OSIM_RL.ENV_RESET_RESPONSE |
|
|
197 |
_command_response['payload'] = {} |
|
|
198 |
_command_response['payload']['observation'] = _observation |
|
|
199 |
if self.verbose: print("Responding with : ", _command_response) |
|
|
200 |
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True)) |
|
|
201 |
else: |
|
|
202 |
_command_response = {} |
|
|
203 |
_command_response['type'] = messages.OSIM_RL.ENV_RESET_RESPONSE |
|
|
204 |
_command_response['payload'] = {} |
|
|
205 |
_command_response['payload']['observation'] = False |
|
|
206 |
if self.verbose: print("Responding with : ", _command_response) |
|
|
207 |
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True)) |
|
|
208 |
elif command['type'] == messages.OSIM_RL.ENV_STEP: |
|
|
209 |
""" |
|
|
210 |
ENV_STEP |
|
|
211 |
|
|
|
212 |
Request : Action array |
|
|
213 |
Respond with updated [observation,reward,done,info] after step |
|
|
214 |
""" |
|
|
215 |
args = command['payload'] |
|
|
216 |
action = args['action'] |
|
|
217 |
action = np.array(action) |
|
|
218 |
if self.env and self.env_available: |
|
|
219 |
[_observation, reward, done, info] = self.env.step(action, project=False) |
|
|
220 |
else: |
|
|
221 |
if self.env: |
|
|
222 |
raise Exception("Attempt to call `step` function after max_steps={} in a single simulation. Please reset your environment before calling the `step` function after max_step s".format(self.max_steps)) |
|
|
223 |
else: |
|
|
224 |
raise Exception("Attempt to call `step` function on a non existent `env`") |
|
|
225 |
self.reward += reward |
|
|
226 |
self.simualation_rewards[-1] += reward |
|
|
227 |
self.current_step += 1 |
|
|
228 |
#_observation = np.array(_observation).tolist() |
|
|
229 |
if self.report: |
|
|
230 |
""" |
|
|
231 |
In case of reporting mode, truncate to the first |
|
|
232 |
41 observations. |
|
|
233 |
(The rest are extra activations which are used only for reporting |
|
|
234 |
and should not be available to the agent) |
|
|
235 |
""" |
|
|
236 |
#_observation = _observation[:41] |
|
|
237 |
pass |
|
|
238 |
|
|
|
239 |
if self.current_step >= self.max_steps: |
|
|
240 |
_command_response = {} |
|
|
241 |
_command_response['type'] = messages.OSIM_RL.ENV_STEP_RESPONSE |
|
|
242 |
_command_response['payload'] = {} |
|
|
243 |
_command_response['payload']['observation'] = _observation |
|
|
244 |
_command_response['payload']['reward'] = reward |
|
|
245 |
_command_response['payload']['done'] = True |
|
|
246 |
_command_response['payload']['info'] = info |
|
|
247 |
|
|
|
248 |
""" |
|
|
249 |
Mark env as unavailable until next reset |
|
|
250 |
""" |
|
|
251 |
self.env_available = False |
|
|
252 |
else: |
|
|
253 |
_command_response = {} |
|
|
254 |
_command_response['type'] = messages.OSIM_RL.ENV_STEP_RESPONSE |
|
|
255 |
_command_response['payload'] = {} |
|
|
256 |
_command_response['payload']['observation'] = _observation |
|
|
257 |
_command_response['payload']['reward'] = reward |
|
|
258 |
_command_response['payload']['done'] = done |
|
|
259 |
_command_response['payload']['info'] = info |
|
|
260 |
|
|
|
261 |
if done: |
|
|
262 |
""" |
|
|
263 |
Mark env as unavailable until next reset |
|
|
264 |
""" |
|
|
265 |
self.env_available = False |
|
|
266 |
if self.verbose: print("Responding with : ", _command_response) |
|
|
267 |
if self.verbose: print("Current Step : ", self.current_step) |
|
|
268 |
_redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True)) |
|
|
269 |
elif command['type'] == messages.OSIM_RL.ENV_SUBMIT: |
|
|
270 |
""" |
|
|
271 |
ENV_SUBMIT |
|
|
272 |
|
|
|
273 |
Submit the final cumulative reward |
|
|
274 |
""" |
|
|
275 |
_response = {} |
|
|
276 |
_response['type'] = messages.OSIM_RL.ENV_SUBMIT_RESPONSE |
|
|
277 |
_payload = {} |
|
|
278 |
_payload['mean_reward'] = np.float(self.reward)/len(self.seed_map) #Mean reward |
|
|
279 |
_payload['simulation_rewards'] = self.simualation_rewards |
|
|
280 |
_payload['simulation_times'] = self.simulation_times |
|
|
281 |
_response['payload'] = _payload |
|
|
282 |
_redis.rpush(command_response_channel, msgpack.packb(_response, default=m.encode, use_bin_type=True)) |
|
|
283 |
elif command['type'] == messages.OSIM_RL.ENV_SUBMIT: |
|
|
284 |
if self.verbose: print("Responding with : ", _response) |
|
|
285 |
return _response |
|
|
286 |
else: |
|
|
287 |
_error = self._error_template( |
|
|
288 |
"UNKNOWN_REQUEST:{}".format( |
|
|
289 |
str(command))) |
|
|
290 |
if self.verbose:print("Responding with : ", _error) |
|
|
291 |
_redis.rpush(command_response_channel, msgpack.packb(_error, default=m.encode, use_bin_type=True)) |
|
|
292 |
return _error |
|
|
293 |
|
|
|
294 |
except Exception as e: |
|
|
295 |
print("Error : ", str(e)) |
|
|
296 |
_redis.rpush( command_response_channel, |
|
|
297 |
msgpack.packb(self._error_template(str(e)), default=m.encode, use_bin_type=True)) |
|
|
298 |
return self._error_template(str(e)) |
|
|
299 |
|
|
|
300 |
if __name__ == "__main__": |
|
|
301 |
import argparse |
|
|
302 |
parser = argparse.ArgumentParser(description='Submit the result to AIcrowd') |
|
|
303 |
parser.add_argument('--port', dest='port', action='store', required=True) |
|
|
304 |
parser.add_argument('--service_id', dest='service_id', default='osim_rl_redis_service_id', required=False) |
|
|
305 |
parser.add_argument('--seed_map', |
|
|
306 |
dest='seed_map', |
|
|
307 |
default="11,22,33", |
|
|
308 |
help="comma separated list of seed values", |
|
|
309 |
required=False) |
|
|
310 |
args = parser.parse_args() |
|
|
311 |
|
|
|
312 |
seed_map = args.seed_map |
|
|
313 |
print("Seeds : ", seed_map.split(",")) |
|
|
314 |
grader = OsimRlRedisService(osim_rl_redis_service_id=args.service_id, remote_port=int(args.port), seed_map=seed_map, max_steps=1000, verbose=True) |
|
|
315 |
result = grader.run() |
|
|
316 |
if result['type'] == messages.OSIM_RL.ENV_SUBMIT_RESPONSE: |
|
|
317 |
cumulative_results = result['payload'] |
|
|
318 |
print("Results : ", cumulative_results) |
|
|
319 |
elif result['type'] == messages.OSIM_RL.ERROR: |
|
|
320 |
error = result['payload'] |
|
|
321 |
raise Exception("Evaluation Failed : {}".format(str(error))) |
|
|
322 |
else: |
|
|
323 |
#Evaluation failed |
|
|
324 |
print("Evaluation Failed : ", result['payload']) |