Source code for citylearn.rl

import random
import numpy as np

# conditional imports
try:
    import torch
    from torch.distributions import Normal
    import torch.nn as nn
    import torch.nn.functional as F
except ImportError:
    raise Exception("This functionality requires you to install torch. You can install torch by : pip install torch torchvision, or for more detailed instructions please visit https://pytorch.org.")

[docs] class PolicyNetwork(nn.Module): def __init__(self, num_inputs, num_actions, action_space, action_scaling_coef, hidden_dim = [400,300], init_w = 3e-3, log_std_min = -20, log_std_max = 2, epsilon = 1e-6): super(PolicyNetwork, self).__init__() self.log_std_min = log_std_min self.log_std_max = log_std_max self.epsilon = epsilon self.linear1 = nn.Linear(num_inputs, hidden_dim[0]) self.linear2 = nn.Linear(hidden_dim[0], hidden_dim[1]) self.mean_linear = nn.Linear(hidden_dim[1], num_actions) self.log_std_linear = nn.Linear(hidden_dim[1], num_actions) self.mean_linear.weight.data.uniform_(-init_w, init_w) self.mean_linear.bias.data.uniform_(-init_w, init_w) self.log_std_linear.weight.data.uniform_(-init_w, init_w) self.log_std_linear.bias.data.uniform_(-init_w, init_w) self.action_scale = torch.FloatTensor( action_scaling_coef * (action_space.high - action_space.low) / 2.) self.action_bias = torch.FloatTensor( action_scaling_coef * (action_space.high + action_space.low) / 2.)
[docs] def forward(self, state): x = F.relu(self.linear1(state)) x = F.relu(self.linear2(x)) mean = self.mean_linear(x) log_std = self.log_std_linear(x) log_std = torch.clamp(log_std, min=self.log_std_min, max=self.log_std_max) return mean, log_std
[docs] def sample(self, state): mean, log_std = self.forward(state) std = log_std.exp() normal = Normal(mean, std) x_t = normal.rsample() # for reparameterization trick (mean + std * N(0,1)) y_t = torch.tanh(x_t) action = y_t * self.action_scale + self.action_bias log_prob = normal.log_prob(x_t) # Enforcing Action Bound log_prob -= torch.log(self.action_scale * (1 - y_t.pow(2)) + self.epsilon) log_prob = log_prob.sum(1, keepdim=True) mean = torch.tanh(mean) * self.action_scale + self.action_bias return action, log_prob, mean
[docs] def to(self, device): self.action_scale = self.action_scale.to(device) self.action_bias = self.action_bias.to(device) return super(PolicyNetwork, self).to(device)
[docs] class ReplayBuffer: def __init__(self, capacity): self.capacity = capacity self.buffer = [] self.position = 0
[docs] def push(self, state, action, reward, next_state, done): if len(self.buffer) < self.capacity: self.buffer.append(None) self.buffer[self.position] = (state, action, reward, next_state, done) self.position = (self.position + 1) % self.capacity
[docs] def sample(self, batch_size): batch = random.sample(self.buffer, batch_size) state, action, reward, next_state, done = map(np.stack, zip(*batch)) return state, action, reward, next_state, done
def __len__(self): return len(self.buffer)
[docs] class RegressionBuffer: def __init__(self, capacity): self.capacity = capacity self.x = [] self.y = [] self.position = 0
[docs] def push(self, variables, targets): if len(self.x) < self.capacity and len(self.x)==len(self.y): self.x.append(None) self.y.append(None) self.x[self.position] = variables self.y[self.position] = targets self.position = (self.position + 1) % self.capacity
def __len__(self): return len(self.x)
[docs] class SoftQNetwork(nn.Module): def __init__(self, num_inputs, num_actions, hidden_size=[400,300], init_w=3e-3): super(SoftQNetwork, self).__init__() self.linear1 = nn.Linear(num_inputs + num_actions, hidden_size[0]) self.linear2 = nn.Linear(hidden_size[0], hidden_size[1]) self.linear3 = nn.Linear(hidden_size[1], 1) self.ln1 = nn.LayerNorm(hidden_size[0]) self.ln2 = nn.LayerNorm(hidden_size[1]) self.linear3.weight.data.uniform_(-init_w, init_w) self.linear3.bias.data.uniform_(-init_w, init_w)
[docs] def forward(self, state, action): x = torch.cat([state, action], 1) x = self.ln1(F.relu(self.linear1(x))) x = self.ln2(F.relu(self.linear2(x))) x = self.linear3(x) return x