--- a +++ b/baselines/ppo2/ppo2.py @@ -0,0 +1,361 @@ +import os +import time +import joblib +import numpy as np +import os.path as osp +import tensorflow as tf +from baselines import logger +from collections import deque +from baselines.common import explained_variance +from baselines.common.runners import AbstractEnvRunner + +import pickle +import copy + +class Model(object): + def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train, + nsteps, ent_coef, vf_coef, max_grad_norm): + sess = tf.get_default_session() + + act_model = policy(sess, ob_space, ac_space, nbatch_act, 1, reuse=False) + train_model = policy(sess, ob_space, ac_space, nbatch_train, nsteps, reuse=True) + + A = train_model.pdtype.sample_placeholder([None]) + ADV = tf.placeholder(tf.float32, [None]) + R = tf.placeholder(tf.float32, [None]) + OLDNEGLOGPAC = tf.placeholder(tf.float32, [None]) + OLDVPRED = tf.placeholder(tf.float32, [None]) + LR = tf.placeholder(tf.float32, []) + CLIPRANGE = tf.placeholder(tf.float32, []) + + neglogpac = train_model.pd.neglogp(A) + entropy = tf.reduce_mean(train_model.pd.entropy()) + + vpred = train_model.vf + vpredclipped = OLDVPRED + tf.clip_by_value(train_model.vf - OLDVPRED, - CLIPRANGE, CLIPRANGE) + vf_losses1 = tf.square(vpred - R) + vf_losses2 = tf.square(vpredclipped - R) + vf_loss = .5 * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2)) + ratio = tf.exp(OLDNEGLOGPAC - neglogpac) + pg_losses = -ADV * ratio + pg_losses2 = -ADV * tf.clip_by_value(ratio, 1.0 - CLIPRANGE, 1.0 + CLIPRANGE) + pg_loss = tf.reduce_mean(tf.maximum(pg_losses, pg_losses2)) + approxkl = .5 * tf.reduce_mean(tf.square(neglogpac - OLDNEGLOGPAC)) + clipfrac = tf.reduce_mean(tf.to_float(tf.greater(tf.abs(ratio - 1.0), CLIPRANGE))) + loss = pg_loss - entropy * ent_coef + vf_loss * vf_coef + with tf.variable_scope('model'): + params = tf.trainable_variables() + grads = tf.gradients(loss, params) + if max_grad_norm is not None: + grads, _grad_norm = tf.clip_by_global_norm(grads, max_grad_norm) + grads = list(zip(grads, params)) + trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5) + _train = trainer.apply_gradients(grads) + + def train(lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None): + advs = returns - values + advs = (advs - advs.mean()) / (advs.std() + 1e-8) + td_map = {train_model.X:obs, A:actions, ADV:advs, R:returns, LR:lr, + CLIPRANGE:cliprange, OLDNEGLOGPAC:neglogpacs, OLDVPRED:values} + if states is not None: + td_map[train_model.S] = states + td_map[train_model.M] = masks + return sess.run( + [pg_loss, vf_loss, entropy, approxkl, clipfrac, _train], + td_map + )[:-1] + self.loss_names = ['policy_loss', 'value_loss', 'policy_entropy', 'approxkl', 'clipfrac'] + + def save(save_path): + ps = sess.run(params) + joblib.dump(ps, save_path) + + def load(load_path): + loaded_params = joblib.load(load_path) + restores = [] + for p, loaded_p in zip(params, loaded_params): + restores.append(p.assign(loaded_p)) + sess.run(restores) + # If you want to load weights, also save/load observation scaling inside VecNormalize + + self.train = train + self.train_model = train_model + self.act_model = act_model + self.step = act_model.step + self.value = act_model.value + self.initial_state = act_model.initial_state + self.save = save + self.load = load + tf.global_variables_initializer().run(session=sess) #pylint: disable=E1101 + +class Runner(AbstractEnvRunner): + + def __init__(self, *, env, model, nsteps, gamma, lam, writer, num_casks=0): + super().__init__(env=env, model=model, nsteps=nsteps) + self.lam = lam + self.gamma = gamma + + self.nenvs = env.num_envs + self.good = set() + self.valid = self.nenvs - num_casks + self.casks = set() + + # tensorboard + self.writer = writer + self.num_episode = 0 + + def run(self): + mb_obs, mb_rewards, mb_actions, mb_values, mb_dones, mb_neglogpacs = [], [], [], [], [], [] + for i in range(self.nenvs): + mb_obs.append([]) + mb_rewards.append([]) + mb_actions.append([]) + mb_values.append([]) + mb_dones.append([]) + mb_neglogpacs.append([]) + + mb_states = self.states + epinfos = [] + + # not initialize cask agents last run + # self.good = set([i for i in range(self.nenvs)]) + self.good = set() + for i in range(self.nenvs): + if i not in self.casks: + self.good.add(i) + while True: + actions, values, self.states, neglogpacs = self.model.step(self.obs, self.states, self.dones) + + tmp_obs = self.obs.copy() + for i in range(self.nenvs): + if i in self.good or i in self.casks: + mb_obs[i].append(tmp_obs[i]) + # mb_actions.append(actions) + mb_values[i].append(values[i]) + mb_neglogpacs[i].append(neglogpacs[i]) + mb_dones[i].append(self.dones[i]) + if i not in self.good: + actions[i] = False + self.casks = set() + self.obs[:], rewards, self.dones, infos = self.env.step(actions) + + self.good = set() + for i in range(self.nenvs): + if not infos[i].get("bad", True): + self.good.add(i) + + for i in range(self.nenvs): + if i in self.good: + action = infos[i].get("action", actions[i]) + mb_actions[i].append(action) + mb_rewards[i].append(rewards[i]) + + if len(mb_rewards[i]) >= self.nsteps: + self.good.remove(i) + + print([len(mb_rewards[i]) for i in range(self.nenvs)]) + print(self.good) + + # when done, add episodic information to tensorboard + for i in range(self.nenvs): + if self.dones[i] and i in self.good: + epinfos.append({'r': infos[i]['episode']['r'], 'l': infos[i]['episode']['l'], 'sr': infos[i]['episode']['shaped_reward']}) + self.num_episode += 1 + summary = tf.Summary() + summary.value.add(tag='episode/length', simple_value=infos[i]['episode']['l']) + summary.value.add(tag='episode/original_reward', simple_value=infos[i]['episode']['r']) + summary.value.add(tag='episode/shaped_reward', simple_value=infos[i]['episode']['shaped_reward']) + summary.value.add(tag='penalty/activation_penalty', simple_value=infos[i]['episode']['activation_penalty']) + summary.value.add(tag='penalty/vx_penalty', simple_value=infos[i]['episode']['vx_penalty']) + summary.value.add(tag='penalty/vz_penalty', simple_value=infos[i]['episode']['vz_penalty']) + self.writer.add_summary(summary, self.num_episode) + + # Cask Effect: top self.nenvs - num_casks is ready + # if all([len(mb_rewards[i]) >= 128 for i in range(self.nenvs)]): + if sum([len(mb_rewards[i]) >= self.nsteps for i in range(self.nenvs)]) >= self.valid: + for i in range(self.nenvs): + if len(mb_rewards[i]) < self.nsteps: + self.casks.add(i) + break + + # remove casks' sample + cask_list = sorted(list(self.casks)) + print('casks:', cask_list) + cask_list.reverse() + for i in cask_list: + mb_obs.pop(i) + mb_rewards.pop(i) + mb_actions.pop(i) + mb_values.pop(i) + mb_neglogpacs.pop(i) + mb_dones.pop(i) + + #batch of steps to batch of rollouts + mb_obs = np.asarray(mb_obs, dtype=self.obs.dtype).transpose((1, 0, 2)) + mb_rewards = np.asarray(mb_rewards, dtype=np.float32).transpose((1, 0)) + mb_actions = np.asarray(mb_actions).transpose((1, 0, 2)) + mb_values = np.asarray(mb_values, dtype=np.float32).transpose((1, 0)) + mb_neglogpacs = np.asarray(mb_neglogpacs, dtype=np.float32).transpose((1, 0)) + mb_dones = np.asarray(mb_dones, dtype=np.bool).transpose((1, 0)) + last_values = self.model.value(self.obs, self.states, self.dones) + + last_values = last_values.tolist() + copy_dones = copy.deepcopy(self.dones) + self.dones = self.dones.tolist() + for i in cask_list: + last_values.pop(i) + self.dones.pop(i) + last_values = np.array(last_values) + self.dones = np.array(self.dones) + + #discount/bootstrap off value fn + mb_returns = np.zeros_like(mb_rewards) + mb_advs = np.zeros_like(mb_rewards) + lastgaelam = 0 + for t in reversed(range(self.nsteps)): + if t == self.nsteps - 1: + nextnonterminal = 1.0 - self.dones + nextvalues = last_values + else: + nextnonterminal = 1.0 - mb_dones[t+1] + nextvalues = mb_values[t+1] + delta = mb_rewards[t] + self.gamma * nextvalues * nextnonterminal - mb_values[t] + mb_advs[t] = lastgaelam = delta + self.gamma * self.lam * nextnonterminal * lastgaelam + mb_returns = mb_advs + mb_values + + # revert valid + cask dimension dones + self.dones = copy_dones + + return (*map(sf01, (mb_obs, mb_returns, mb_dones, mb_actions, mb_values, mb_neglogpacs)), + mb_states, epinfos) +# obs, returns, masks, actions, values, neglogpacs, states = runner.run() +def sf01(arr): + """ + swap and then flatten axes 0 and 1 + """ + s = arr.shape + return arr.swapaxes(0, 1).reshape(s[0] * s[1], *s[2:]) + +def constfn(val): + def f(_): + return val + return f + +def learn(*, policy, env, nsteps, total_timesteps, ent_coef, lr, + vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95, + log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2, + save_interval=0, load_path=None, num_casks=0): + + if isinstance(lr, float): lr = constfn(lr) + else: assert callable(lr) + if isinstance(cliprange, float): cliprange = constfn(cliprange) + else: assert callable(cliprange) + total_timesteps = int(total_timesteps) + + nenvs = env.num_envs - num_casks + ob_space = env.observation_space + ac_space = env.action_space + nbatch = nenvs * nsteps + nbatch_train = nbatch // nminibatches + + make_model = lambda : Model(policy=policy, ob_space=ob_space, ac_space=ac_space, nbatch_act=env.num_envs, nbatch_train=nbatch_train, + nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, + max_grad_norm=max_grad_norm) + if save_interval and logger.get_dir(): + import cloudpickle + with open(osp.join(logger.get_dir(), 'make_model.pkl'), 'wb') as fh: + fh.write(cloudpickle.dumps(make_model)) + model = make_model() + if load_path is not None: + model.load(load_path) + # load running mean std + checkdir = load_path[0:-5] + checkpoint = int(load_path.split('/')[-1]) + if osp.exists(osp.join(checkdir, '%.5i_ob_rms.pkl' % checkpoint)): + with open(osp.join(checkdir, '%.5i_ob_rms.pkl' % checkpoint), 'rb') as ob_rms_fp: + env.ob_rms = pickle.load(ob_rms_fp) + # if osp.exists(osp.join(checkdir, '%.5i_ret_rms.pkl' % checkpoint)): + # with open(osp.join(checkdir, '%.5i_ret_rms.pkl' % checkpoint), 'rb') as ret_rms_fp: + # env.ret_rms = pickle.load(ret_rms_fp) + # tensorboard + writer = tf.summary.FileWriter(logger.get_dir(), tf.get_default_session().graph) + runner = Runner(env=env, model=model, nsteps=nsteps, gamma=gamma, lam=lam, writer=writer, num_casks=num_casks) + + epinfobuf = deque(maxlen=100) + tfirststart = time.time() + + nupdates = total_timesteps//nbatch + for update in range(1, nupdates+1): + assert nbatch % nminibatches == 0 + nbatch_train = nbatch // nminibatches + tstart = time.time() + frac = 1.0 - (update - 1.0) / nupdates + lrnow = lr(frac) + cliprangenow = cliprange(frac) + obs, returns, masks, actions, values, neglogpacs, states, epinfos = runner.run() #pylint: disable=E0632 + epinfobuf.extend(epinfos) + mblossvals = [] + if states is None: # nonrecurrent version + inds = np.arange(nbatch) + for _ in range(noptepochs): + np.random.shuffle(inds) + for start in range(0, nbatch, nbatch_train): + end = start + nbatch_train + mbinds = inds[start:end] + slices = (arr[mbinds] for arr in (obs, returns, masks, actions, values, neglogpacs)) + mblossvals.append(model.train(lrnow, cliprangenow, *slices)) + else: # recurrent version + assert nenvs % nminibatches == 0 + envsperbatch = nenvs // nminibatches + envinds = np.arange(nenvs) + flatinds = np.arange(nenvs * nsteps).reshape(nenvs, nsteps) + envsperbatch = nbatch_train // nsteps + for _ in range(noptepochs): + np.random.shuffle(envinds) + for start in range(0, nenvs, envsperbatch): + end = start + envsperbatch + mbenvinds = envinds[start:end] + mbflatinds = flatinds[mbenvinds].ravel() + slices = (arr[mbflatinds] for arr in (obs, returns, masks, actions, values, neglogpacs)) + mbstates = states[mbenvinds] + mblossvals.append(model.train(lrnow, cliprangenow, *slices, mbstates)) + + lossvals = np.mean(mblossvals, axis=0) + tnow = time.time() + fps = int(nbatch / (tnow - tstart)) + if update % log_interval == 0 or update == 1: + ev = explained_variance(values, returns) + logger.logkv("serial_timesteps", update*nsteps) + logger.logkv("nupdates", update) + logger.logkv("total_timesteps", update*nbatch) + logger.logkv("fps", fps) + logger.logkv("explained_variance", float(ev)) + logger.logkv('eprewmean', safemean([epinfo['r'] for epinfo in epinfobuf])) + logger.logkv('eplenmean', safemean([epinfo['l'] for epinfo in epinfobuf])) + logger.logkv('epsrewmean', safemean([epinfo['sr'] for epinfo in epinfobuf])) + logger.logkv('time_elapsed', tnow - tfirststart) + for (lossval, lossname) in zip(lossvals, model.loss_names): + logger.logkv(lossname, lossval) + logger.dumpkvs() + # tensorboard + summary = tf.Summary() + summary.value.add(tag='iteration/reward_mean', simple_value=safemean([epinfo['r'] for epinfo in epinfobuf])) + summary.value.add(tag='iteration/length_mean', simple_value=safemean([epinfo['l'] for epinfo in epinfobuf])) + summary.value.add(tag='iteration/shaped_reward_mean', simple_value=safemean([epinfo['sr'] for epinfo in epinfobuf])) + summary.value.add(tag='iteration/fps', simple_value=fps) + writer.add_summary(summary, update) + if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir(): + checkdir = osp.join(logger.get_dir(), 'checkpoints') + os.makedirs(checkdir, exist_ok=True) + savepath = osp.join(checkdir, '%.5i'%update) + print('Saving to', savepath) + model.save(savepath) + # save running mean std + with open(osp.join(checkdir, '%.5i_ob_rms.pkl' % update), 'wb') as ob_rms_fp: + pickle.dump(env.ob_rms, ob_rms_fp) + with open(osp.join(checkdir, '%.5i_ret_rms.pkl' % update), 'wb') as ret_rms_fp: + pickle.dump(env.ret_rms, ret_rms_fp) + env.close() + +def safemean(xs): + return np.nan if len(xs) == 0 else np.mean(xs)