Diff of /osim/redis/service.py [000000] .. [077a87]

Switch to unified view

a b/osim/redis/service.py
1
#!/usr/bin/env python
2
from __future__ import print_function
3
import redis
4
from osim.redis import messages
5
import json
6
import numpy as np
7
import msgpack
8
import msgpack_numpy as m
9
m.patch()
10
import osim
11
from osim.env import *
12
import os
13
import timeout_decorator
14
import time
15
16
########################################################
17
# CONSTANTS
18
########################################################
19
PER_STEP_TIMEOUT = 20*60 # 20minutes
20
21
22
class OsimRlRedisService:
23
    def __init__(   self,
24
                    osim_rl_redis_service_id = 'osim_rl_redis_service_id',
25
                    seed_map = False,
26
                    max_steps = 1000,
27
                    remote_host = '127.0.0.1',
28
                    remote_port = 6379,
29
                    remote_db = 0,
30
                    remote_password = None,
31
                    difficulty = 1,
32
                    max_obstacles = 10,
33
                    visualize = False,
34
                    report = None,
35
                    verbose = False):
36
        """
37
            TODO: Expose more RunEnv related variables
38
        """
39
        print("Attempting to connect to redis server at {}:{}/{}".format(remote_host, remote_port, remote_db))
40
        self.remote_host = remote_host
41
        self.remote_port = remote_port
42
        self.remote_db = remote_db
43
        self.remote_password = remote_password
44
45
        self.redis_pool = redis.ConnectionPool(host=remote_host, port=remote_port, db=remote_db, password=remote_password)
46
        self.namespace = "osim-rl"
47
        self.service_id = osim_rl_redis_service_id
48
        self.command_channel = "{}::{}::commands".format(self.namespace, self.service_id)
49
        self.env = False
50
        self.env_available = False
51
        self.reward = 0
52
        self.simulation_count = 0
53
        self.simualation_rewards = []
54
        self.simulation_times = []
55
        self.begin_simulation = False
56
        self.current_step = 0
57
        self.difficulty = difficulty
58
        self.max_obstacles = max_obstacles
59
        self.verbose = verbose
60
        self.visualize = visualize
61
        self.report = report
62
        self.max_steps = max_steps
63
        self.initalize_seed_map(seed_map)
64
65
    def initalize_seed_map(self, seed_map_string):
66
        if seed_map_string:
67
            assert type(seed_map_string) == type("")
68
            seed_map = seed_map_string.split(",")
69
            seed_map = [int(x) for x in seed_map]
70
            self.seed_map = seed_map
71
        else:
72
            self.seed_map = [np.random.randint(0,10**10)]
73
74
    def get_redis_connection(self):
75
        redis_conn = redis.Redis(connection_pool=self.redis_pool)
76
        try:
77
            redis_conn.ping()
78
        except:
79
            raise Exception(
80
                    "Unable to connect to redis server at {}:{} ."
81
                    "Are you sure there is a redis-server running at the "
82
                    "specified location ?".format(
83
                        self.remote_host,
84
                        self.remote_port
85
                        )
86
                    )
87
        return redis_conn
88
89
    def _error_template(self, payload):
90
        _response = {}
91
        _response['type'] = messages.OSIM_RL.ERROR
92
        _response['payload'] = payload
93
        return _response
94
95
    @timeout_decorator.timeout(PER_STEP_TIMEOUT)# timeout for each command
96
    def get_next_command(self, _redis):
97
        command = _redis.brpop(self.command_channel)[1]
98
        return command
99
100
    def run(self):
101
        print("Listening for commands at : ", self.command_channel)
102
        while True:
103
            try:
104
                _redis = self.get_redis_connection()
105
                command = self.get_next_command(_redis)
106
                print("Command Service: ", command)
107
            except timeout_decorator.timeout_decorator.TimeoutError:
108
                raise Exception("Timeout in step {} of simulation {}".format(self.current_step, self.simulation_count))
109
            command_response_channel = "default_response_channel"
110
            if self.verbose: print("Self.Reward : ", self.reward)
111
            if self.verbose: print("Current Simulation : ", self.simulation_count)
112
            if self.seed_map and self.verbose and self.simulation_count < len(self.seed_map): print("Current SEED : ", self.seed_map[self.simulation_count])
113
            try:
114
                # command = json.loads(command.decode('utf-8'))
115
                command = msgpack.unpackb(command, object_hook=m.decode, encoding="utf8")
116
                if self.verbose: print("Received Request : ", command)
117
                command_response_channel = command['response_channel']
118
                if command['type'] == messages.OSIM_RL.PING:
119
                    """
120
                        INITIAL HANDSHAKE : Respond with PONG
121
                    """
122
                    _command_response = {}
123
                    _command_response['type'] = messages.OSIM_RL.PONG
124
                    _command_response['payload'] = {}
125
                    if self.verbose: print("Responding with : ", _command_response)
126
                    _redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
127
                elif command['type'] == messages.OSIM_RL.ENV_CREATE:
128
                    """
129
                        ENV_CREATE
130
131
                        Respond with initial observation
132
                    """
133
                    _payload = command['payload']
134
135
                    if self.env: #If env already exists, throw an error
136
                        _error_message = "Attempt to create environment when one already exists."
137
                        if self.verbose: print("Responding with : ", self._error_template(_error_message))
138
                        _redis.rpush( command_response_channel, self._error_template(_error_message))
139
                        return self._error_template(_error_message)
140
                    else:
141
                        self.env = L2M2019Env(  visualize = self.visualize,
142
                                                    difficulty = self.difficulty,
143
                                                    seed = self.seed_map[self.simulation_count],
144
                                                    report = self.report
145
                        )
146
                        _observation = self.env.reset(project=False)
147
                        self.begin_simulation = time.time()
148
                        self.simualation_rewards.append(0)
149
                        self.env_available = True
150
                        self.current_step = 0
151
                        #_observation = np.array(_observation).tolist()
152
                        if self.report:
153
                            """
154
                                In case of reporting mode, truncate to the first
155
                                41 observations.
156
                                (The rest are extra activations which are used only for reporting
157
                                and should not be available to the agent)
158
                            """
159
                            #_observation = _observation[:41]
160
                            pass
161
162
                        _command_response = {}
163
                        _command_response['type'] = messages.OSIM_RL.ENV_CREATE_RESPONSE
164
                        _command_response['payload'] = {}
165
                        _command_response['payload']['observation'] = _observation
166
                        if self.verbose: print("Responding with : ", _command_response)
167
                        _redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
168
                elif command['type'] == messages.OSIM_RL.ENV_RESET:
169
                    """
170
                        ENV_RESET
171
172
                        Respond with observation from next simulation or
173
                        False if no simulations are left
174
                    """
175
                    self.simulation_count += 1
176
                    if self.begin_simulation:
177
                        self.simulation_times.append(time.time()-self.begin_simulation)
178
                        self.begin_simulation = time.time()
179
                    if self.seed_map and self.simulation_count < len(self.seed_map):
180
                        _observation = self.env.reset(seed=self.seed_map[self.simulation_count], project=False)
181
                        self.simualation_rewards.append(0)
182
                        self.env_available = True
183
                        self.current_step = 0
184
                        #_observation = list(_observation)
185
                        if self.report:
186
                            """
187
                                In case of reporting mode, truncate to the first
188
                                41 observations.
189
                                (The rest are extra activations which are used only for reporting
190
                                and should not be available to the agent)
191
                            """
192
                            #_observation = _observation[:41]
193
                            pass
194
195
                        _command_response = {}
196
                        _command_response['type'] = messages.OSIM_RL.ENV_RESET_RESPONSE
197
                        _command_response['payload'] = {}
198
                        _command_response['payload']['observation'] = _observation
199
                        if self.verbose: print("Responding with : ", _command_response)
200
                        _redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
201
                    else:
202
                        _command_response = {}
203
                        _command_response['type'] = messages.OSIM_RL.ENV_RESET_RESPONSE
204
                        _command_response['payload'] = {}
205
                        _command_response['payload']['observation'] = False
206
                        if self.verbose: print("Responding with : ", _command_response)
207
                        _redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
208
                elif command['type'] == messages.OSIM_RL.ENV_STEP:
209
                    """
210
                        ENV_STEP
211
212
                        Request : Action array
213
                        Respond with updated [observation,reward,done,info] after step
214
                    """
215
                    args = command['payload']
216
                    action = args['action']
217
                    action = np.array(action)
218
                    if self.env and self.env_available:
219
                        [_observation, reward, done, info] = self.env.step(action, project=False)
220
                    else:
221
                        if self.env:
222
                            raise Exception("Attempt to call `step` function after max_steps={} in a single simulation. Please reset your environment before calling the `step` function after max_step s".format(self.max_steps))
223
                        else:
224
                                raise Exception("Attempt to call `step` function on a non existent `env`")
225
                    self.reward += reward
226
                    self.simualation_rewards[-1] += reward
227
                    self.current_step += 1
228
                    #_observation = np.array(_observation).tolist()
229
                    if self.report:
230
                        """
231
                            In case of reporting mode, truncate to the first
232
                            41 observations.
233
                            (The rest are extra activations which are used only for reporting
234
                            and should not be available to the agent)
235
                        """
236
                        #_observation = _observation[:41]
237
                        pass
238
239
                    if self.current_step >= self.max_steps:
240
                        _command_response = {}
241
                        _command_response['type'] = messages.OSIM_RL.ENV_STEP_RESPONSE
242
                        _command_response['payload'] = {}
243
                        _command_response['payload']['observation'] = _observation
244
                        _command_response['payload']['reward'] = reward
245
                        _command_response['payload']['done'] = True
246
                        _command_response['payload']['info'] = info
247
248
                        """
249
                        Mark env as unavailable until next reset
250
                        """
251
                        self.env_available = False
252
                    else:
253
                        _command_response = {}
254
                        _command_response['type'] = messages.OSIM_RL.ENV_STEP_RESPONSE
255
                        _command_response['payload'] = {}
256
                        _command_response['payload']['observation'] = _observation
257
                        _command_response['payload']['reward'] = reward
258
                        _command_response['payload']['done'] = done
259
                        _command_response['payload']['info'] = info
260
261
                        if done:
262
                            """
263
                                Mark env as unavailable until next reset
264
                            """
265
                            self.env_available = False
266
                    if self.verbose: print("Responding with : ", _command_response)
267
                    if self.verbose: print("Current Step : ", self.current_step)
268
                    _redis.rpush(command_response_channel, msgpack.packb(_command_response, default=m.encode, use_bin_type=True))
269
                elif command['type'] == messages.OSIM_RL.ENV_SUBMIT:
270
                    """
271
                        ENV_SUBMIT
272
273
                        Submit the final cumulative reward
274
                    """
275
                    _response = {}
276
                    _response['type'] = messages.OSIM_RL.ENV_SUBMIT_RESPONSE
277
                    _payload = {}
278
                    _payload['mean_reward'] = np.float(self.reward)/len(self.seed_map) #Mean reward
279
                    _payload['simulation_rewards'] = self.simualation_rewards
280
                    _payload['simulation_times'] = self.simulation_times
281
                    _response['payload'] = _payload
282
                    _redis.rpush(command_response_channel, msgpack.packb(_response, default=m.encode, use_bin_type=True))
283
                elif command['type'] == messages.OSIM_RL.ENV_SUBMIT:
284
                    if self.verbose: print("Responding with : ", _response)
285
                    return _response
286
                else:
287
                    _error = self._error_template(
288
                                    "UNKNOWN_REQUEST:{}".format(
289
                                        str(command)))
290
                    if self.verbose:print("Responding with : ", _error)
291
                    _redis.rpush(command_response_channel, msgpack.packb(_error, default=m.encode, use_bin_type=True))
292
                    return _error
293
294
            except Exception as e:
295
                print("Error : ", str(e))
296
                _redis.rpush(   command_response_channel,
297
                                msgpack.packb(self._error_template(str(e)), default=m.encode, use_bin_type=True))
298
                return self._error_template(str(e))
299
300
if __name__ == "__main__":
301
    import argparse
302
    parser = argparse.ArgumentParser(description='Submit the result to AIcrowd')
303
    parser.add_argument('--port', dest='port', action='store', required=True)
304
    parser.add_argument('--service_id', dest='service_id', default='osim_rl_redis_service_id', required=False)
305
    parser.add_argument('--seed_map',
306
                        dest='seed_map',
307
                        default="11,22,33",
308
                        help="comma separated list of seed values",
309
                        required=False)
310
    args = parser.parse_args()
311
    
312
    seed_map = args.seed_map
313
    print("Seeds : ", seed_map.split(","))
314
    grader = OsimRlRedisService(osim_rl_redis_service_id=args.service_id, remote_port=int(args.port), seed_map=seed_map, max_steps=1000, verbose=True)
315
    result = grader.run()
316
    if result['type'] == messages.OSIM_RL.ENV_SUBMIT_RESPONSE:
317
        cumulative_results = result['payload']
318
        print("Results : ", cumulative_results)
319
    elif result['type'] == messages.OSIM_RL.ERROR:
320
        error = result['payload']
321
        raise Exception("Evaluation Failed : {}".format(str(error)))
322
    else:
323
        #Evaluation failed
324
        print("Evaluation Failed : ", result['payload'])