--- a +++ b/ADDPG/model.py @@ -0,0 +1,363 @@ +# ----------------------------------- +# Deep Deterministic Policy Gradient +# Author: Kaizhao Liang, Hang Yu +# Date: 08.21.2017 +# ----------------------------------- +import tensorflow as tf +import numpy as np +from ou_noise import OUNoise +from critic_network import CriticNetwork +from actor_network import ActorNetwork +from replay_buffer import ReplayBuffer +from helper import * +from time import gmtime, strftime, sleep + +import opensim as osim +from osim.http.client import Client +from osim.env import * + +import multiprocessing +from multiprocessing import Process, Pipe + +# [Hacked] the memory might always be leaking, here's a solution #58 +# https://github.com/stanfordnmbl/osim-rl/issues/58 +# separate process that holds a separate RunEnv instance. +# This has to be done since RunEnv() in the same process result in interleaved running of simulations. +def standalone_headless_isolated(conn,vis): + e = RunEnv(visualize=vis) + while True: + try: + msg = conn.recv() + + # messages should be tuples, + # msg[0] should be string + + if msg[0] == 'reset': + o = e.reset(difficulty=2) + conn.send(o) + elif msg[0] == 'step': + ordi = e.step(msg[1]) + conn.send(ordi) + else: + conn.close() + del e + return + except: + conn.close() + del e + raise + +# class that manages the interprocess communication and expose itself as a RunEnv. +class ei: # Environment Instance + def __init__(self,vis): + self.pc, self.cc = Pipe() + self.p = Process( + target = standalone_headless_isolated, + args=(self.cc,vis,) + ) + self.p.daemon = True + self.p.start() + + def reset(self): + self.pc.send(('reset',)) + return self.pc.recv() + + def step(self,actions): + self.pc.send(('step',actions,)) + try: + return self.pc.recv() + except : + print('Error in recv()') + raise + + def __del__(self): + self.pc.send(('exit',)) + #print('(ei)waiting for join...') + self.p.join() + try: + del self.pc + del self.cc + del self.p + except: + raise + +############################################### +# DDPG Worker +############################################### +pause_perceive = False +replay_buffer = ReplayBuffer(1e6) + +class Worker: + """docstring for DDPG""" + def __init__(self,sess,number,model_path,global_episodes,explore,training,vis,batch_size,gamma,n_step,global_actor_net): + self.name = 'worker_' + str(number) # name for uploading results + self.number = number + # Randomly initialize actor network and critic network + # with both their target networks + self.state_dim = 41+3+14 # 41 observations plus 17 induced velocity + self.action_dim = 18 + self.model_path= model_path + self.global_episodes = global_episodes + self.increment = self.global_episodes.assign_add(1) + self.sess = sess + self.explore = explore + self.noise_decay = 1. + self.training = training + self.vis = vis # == True only during testing + self.total_steps = 0 # for ReplayBuffer to count + self.batch_size = batch_size + self.gamma = gamma + self.n_step = n_step + # Initialize a random process the Ornstein-Uhlenbeck process for action exploration + self.exploration_noise = OUNoise(self.action_dim) + + self.actor_network = ActorNetwork(self.sess,self.state_dim,self.action_dim,self.name+'/actor') + self.update_local_actor = update_graph(global_actor_net,self.actor_network.net) + if self.name == 'worker_1': + self.critic_network = CriticNetwork(self.sess,self.state_dim,self.action_dim,self.name+'/critic') + self.actor_network.update_target(sess) + self.critic_network.update_target(sess) + self.update_global_actor = update_graph(self.actor_network.net,global_actor_net) + + def start(self): + self.env = ei(vis=self.vis)#RunEnv(visualize=True) + + def restart(self): # restart env every ? eps to coutner memory leak + if self.env != None: + del self.env + sleep(0.001) + self.env = ei(vis=self.vis) + + def train(self): + # print "train step",self.time_step + # Sample a random minibatch of N transitions from replay buffer + global replay_buffer + minibatch = replay_buffer.get_batch(self.batch_size) + + BATCH_SIZE = self.batch_size + #print(self.batch_size) + state_batch = np.asarray([data[0] for data in minibatch]) + + action_batch = np.asarray([data[1] for data in minibatch]) + reward_batch = np.asarray([data[2] for data in minibatch]) + + next_state_batch = np.asarray([data[3] for data in minibatch]) + done_batch = np.asarray([data[4] for data in minibatch]) + + # for action_dim = 1 + action_batch = np.resize(action_batch,[BATCH_SIZE,self.action_dim]) + + # Calculate y_batch + next_action_batch = self.actor_network.target_actions(self.sess,next_state_batch) + q_value_batch = self.critic_network.target_q(self.sess,next_state_batch,next_action_batch) + + done_mask = np.asarray([0. if done else 1. for done in done_batch]) + y_batch = reward_batch + self.gamma**self.n_step * q_value_batch * done_mask + y_batch = np.resize(y_batch,[BATCH_SIZE,1]) + # Update critic by minimizing the loss L + _,loss,a,b,norm = self.critic_network.train(self.sess,y_batch,state_batch,action_batch) + #print(a) + #print(b) + #print(loss) + #print(norm) + + # Update the actor policy using the sampled gradient: + action_batch_for_gradients = self.actor_network.actions(self.sess,state_batch) + q_gradient_batch = self.critic_network.gradients(self.sess,state_batch,action_batch_for_gradients) + q_gradient_batch *= -1. + ''' + # invert gradient formula : dq = (a_max-a) / (a_max - a_min) if dq>0, else dq = (a - a_min) / (a_max - a_min) + for i in range(BATCH_SIZE): # In our case a_max = 1, a_min = 0 + for j in range(18): + dq = q_gradient_batch[i,j] + a = action_batch_for_gradients[i,j] + if dq > 0.: + q_gradient_batch[i,j] *= (0.95-a) + else: + q_gradient_batch[i,j] *= a-0.05''' + + _,norm = self.actor_network.train(self.sess,q_gradient_batch,state_batch) + #print(norm) + # Update the networks + self.actor_network.update_target(self.sess) + self.critic_network.update_target(self.sess) + self.sess.run(self.update_global_actor) + + + def save_model(self, saver, episode): + saver.save(self.sess, self.model_path + "/model-" + str(episode) + ".ckpt") + + def noise_action(self,state): + action = self.actor_network.action(self.sess,state) + return np.clip(action,0.05,0.95)+self.exploration_noise.noise()*self.noise_decay + + def action(self,state): + action = self.actor_network.action(self.sess,state) + return action + + def perceive(self,transition): + # Store transition (s_t,a_t,r_t,s_{t+1}) in replay buffer + global replay_buffer + replay_buffer.add(transition) + + def work(self,coord,saver): + global replay_buffer + global pause_perceive + + if self.training: + episode_count = self.sess.run(self.global_episodes) + else: + episode_count = 0 + wining_episode_count = 0 + + print ("Starting worker_" + str(self.number)) + + if self.name == 'worker_0': + with open('result.txt','w') as f: + f.write(strftime("Starting time: %a, %d %b %Y %H:%M:%S\n", gmtime())) + + self.start() # change Aug24 start the env + + with self.sess.as_default(), self.sess.graph.as_default(): + #not_start_training_yet = True + while not coord.should_stop(): + + returns = [] + episode_buffer = [] + episode_reward = 0 + self.noise_decay -= 1./self.explore#np.maximum(abs(np.cos(self.explore / 20 * np.pi)),0.67) + self.explore -= 1 + start_training = episode_count > 50 #replay_buffer.count() >= 500e3 # start_training + erase_buffer = False # erase buffer + + if self.name != "worker_1": + self.sess.run(self.update_local_actor) + + state = self.env.reset() + seed= 0.1 + ea=engineered_action(seed) + + s,s1,s2 = [],[],[] + ob = self.env.step(ea)[0] + s = ob + ob = self.env.step(ea)[0] + s1 = ob + s = process_state(s,s1) + + if self.name == 'worker_0': + print("episode:{}".format(str(episode_count)+' '+self.name)) + # Train + action = ea + demo = int(50*self.noise_decay) + for step in xrange(1000): + if self.name == "worker_1" and start_training and self.training: + #pause_perceive=True + #print(self.name+'is training') + #self.train() + self.train() + #pause_perceive=False + if erase_buffer: + pause_perceive = True + replay_buffer.erase() # erase old experience every time the model is saved + pause_perceive = False + break + if demo > 0: + action = ea + demo -=1 + elif self.explore>0 and self.training: + action = np.clip(self.noise_action(s),0.05,0.95) # change Aug20 + else: + action = np.clip(self.action(s),0.05,0.95) + + try: + s2,reward,done,_ = self.env.step(action) + except: + print('Env error. Shutdown {}'.format(self.name)) + if self.env != None: + del self.env + return 0 + + s1 = process_state(s1,s2) + #print(s1) + if s1[2] > 0.75: + height_reward = 0. + else: + height_reward = -0.05 + if not done: + ep_reward = 1.005 + else: + ep_reward = 0.0 + d_head_pelvis = abs(s1[22]-s[1]) + #print(d_head_pelvis) + if d_head_pelvis > 0.25: + sta_reward = -0.05 + else: + sta_reward = 0. + #print((s1[4]+height_reward+sta_reward)*ep_reward) + episode_buffer.append([s,action,(s1[4]+height_reward+sta_reward)*ep_reward,s1,done]) + if step > self.n_step and not pause_perceive: + transition = n_step_transition(episode_buffer,self.n_step,self.gamma) + self.perceive(transition) + + s = s1 + s1 = s2 + episode_reward += reward + if done: + self.exploration_noise.reset(None) + break + + if self.name == 'worker_0' and episode_count % 5 == 0: + with open('result.txt','a') as f: + f.write("Episode "+str(episode_count)+" reward (training): %.2f\n" % episode_reward) + + # Testing: + if self.name == 'worker_2' and episode_count % 10 == 0 and episode_count > 1: # change Aug19 + if episode_count % 25 == 0 and not self.vis: + self.save_model(saver, episode_count) + + total_return = 0 + TEST = 1 + for i in xrange(TEST): + state = self.env.reset() + a=engineered_action(seed) + ob = self.env.step(a)[0] + s = ob + ob = self.env.step(a)[0] + s1 = ob + s = process_state(s,s1) + for j in xrange(1000): + action = self.action(s) # direct action for test + s2,reward,done,_ = self.env.step(action) + s1 = process_state(s1,s2) + s = s1 + s1 = s2 + total_return += reward + if done: + break + + ave_return = total_return/TEST + returns.append(ave_return) + with open('result.txt','a') as f: + f.write('episode: {} Evaluation(testing) Average Return: {}\n'.format(episode_count,ave_return)) + + if self.name == 'worker_0' and self.training: + self.sess.run(self.increment) + episode_count += 1 + if episode_count == 100: + del self.env + # All done Stop trail + # Confirm exit + print('Done '+self.name) + return + # All done Stop trail + # Confirm exit + print('Done '+self.name) + return + + + + + + + +