# Various math calculations used by algorithms
import numpy as np
import torch
# general math methods
def center_mean(v):
'''Center an array by its mean'''
return v - v.mean()
def normalize(v):
'''Method to normalize a rank-1 np array'''
v_min = v.min()
v_max = v.max()
v_range = v_max - v_min
v_range += 1e-08 # division guard
v_norm = (v - v_min) / v_range
return v_norm
def standardize(v):
'''Method to standardize a rank-1 np array'''
assert len(v) > 1, 'Cannot standardize vector of size 1'
v_std = (v - v.mean()) / (v.std() + 1e-08)
return v_std
def to_one_hot(data, max_val):
'''Convert an int list of data into one-hot vectors'''
return np.eye(max_val)[np.array(data)]
def venv_pack(batch_tensor, num_envs):
'''Apply the reverse of venv_unpack to pack a batch tensor from (b*num_envs, *shape) to (b, num_envs, *shape)'''
shape = list(batch_tensor.shape)
if len(shape) < 2: # scalar data (b, num_envs,)
return batch_tensor.view(-1, num_envs)
else: # non-scalar data (b, num_envs, *shape)
pack_shape = [-1, num_envs] + shape[1:]
return batch_tensor.view(pack_shape)
def venv_unpack(batch_tensor):
Unpack a sampled vec env batch tensor
e.g. for a state with original shape (4, ), vec env should return vec state with shape (num_envs, 4) to store in memory
When sampled with batch_size b, we should get shape (b, num_envs, 4). But we need to unpack the num_envs dimension to get (b * num_envs, 4) for passing to a network. This method does that.
shape = list(batch_tensor.shape)
if len(shape) < 3: # scalar data (b, num_envs,)
return batch_tensor.view(-1)
else: # non-scalar data (b, num_envs, *shape)
unpack_shape = [-1] + shape[2:]
return batch_tensor.view(unpack_shape)
# Policy Gradient calc
# advantage functions
def calc_returns(rewards, dones, gamma):
Calculate the simple returns (full rollout) i.e. sum discounted rewards up till termination
T = len(rewards)
rets = torch.zeros_like(rewards)
future_ret = torch.tensor(0.0, dtype=rewards.dtype)
not_dones = 1 - dones
for t in reversed(range(T)):
rets[t] = future_ret = rewards[t] + gamma * future_ret * not_dones[t]
return rets
def calc_nstep_returns(rewards, dones, next_v_pred, gamma, n):
Estimate the advantages using n-step returns. Ref:
Also see Algorithm S3 from A3C paper for the calculation used below
R^(n)_t = r_{t} + gamma r_{t+1} + ... + gamma^(n-1) r_{t+n-1} + gamma^(n) V(s_{t+n})
rets = torch.zeros_like(rewards)
future_ret = next_v_pred
not_dones = 1 - dones
for t in reversed(range(n)):
rets[t] = future_ret = rewards[t] + gamma * future_ret * not_dones[t]
return rets
def calc_gaes(rewards, dones, v_preds, gamma, lam):
Estimate the advantages using GAE from Schulman et al.
v_preds are values predicted for current states, with one last element as the final next_state
delta is defined as r + gamma * V(s') - V(s) in eqn 10
GAE is defined in eqn 16
This method computes in torch tensor to prevent unnecessary moves between devices (e.g. GPU tensor to CPU numpy)
NOTE any standardization is done outside of this method
T = len(rewards)
assert T + 1 == len(v_preds), f'T+1: {T+1} v.s. v_preds.shape: {v_preds.shape}' # v_preds runs into t+1
gaes = torch.zeros_like(rewards)
future_gae = torch.tensor(0.0, dtype=rewards.dtype)
not_dones = 1 - dones # to reset at episode boundary by multiplying 0
deltas = rewards + gamma * v_preds[1:] * not_dones - v_preds[:-1]
coef = gamma * lam
for t in reversed(range(T)):
gaes[t] = future_gae = deltas[t] + coef * not_dones[t] * future_gae
return gaes
def calc_q_value_logits(state_value, raw_advantages):
mean_adv = raw_advantages.mean(dim=-1).unsqueeze(dim=-1)
return state_value + raw_advantages - mean_adv
# generic variable decay methods
def no_decay(start_val, end_val, start_step, end_step, step):
'''dummy method for API consistency'''
return start_val
def linear_decay(start_val, end_val, start_step, end_step, step):
'''Simple linear decay with annealing'''
if step < start_step:
return start_val
slope = (end_val - start_val) / (end_step - start_step)
val = max(slope * (step - start_step) + start_val, end_val)
return val
def rate_decay(start_val, end_val, start_step, end_step, step, decay_rate=0.9, frequency=20.):
'''Compounding rate decay that anneals in 20 decay iterations until end_step'''
if step < start_step:
return start_val
if step >= end_step:
return end_val
step_per_decay = (end_step - start_step) / frequency
decay_step = (step - start_step) / step_per_decay
val = max(np.power(decay_rate, decay_step) * start_val, end_val)
return val
def periodic_decay(start_val, end_val, start_step, end_step, step, frequency=60.):
Linearly decaying sinusoid that decays in roughly 10 iterations until explore_anneal_epi
Plot the equation below to see the pattern
suppose sinusoidal decay, start_val = 1, end_val = 0.2, stop after 60 unscaled x steps
then we get 0.2+0.5*(1-0.2)(1 + cos x)*(1-x/60)
if step < start_step:
return start_val
if step >= end_step:
return end_val
x_freq = frequency
step_per_decay = (end_step - start_step) / x_freq
x = (step - start_step) / step_per_decay
unit = start_val - end_val
val = end_val * 0.5 * unit * (1 + np.cos(x) * (1 - x / x_freq))
val = max(val, end_val)
return val