-
Notifications
You must be signed in to change notification settings - Fork 106
/
A2C.py
136 lines (120 loc) · 5.8 KB
/
A2C.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import torch as th
from torch import nn
from torch.optim import Adam, RMSprop
import numpy as np
from common.Agent import Agent
from common.Model import ActorNetwork, CriticNetwork
from common.utils import entropy, index_to_one_hot, to_tensor_var
class A2C(Agent):
"""
An agent learned with Advantage Actor-Critic
- Actor takes state as input
- Critic takes both state and action as input
- agent interact with environment to collect experience
- agent training with experience to update policy
"""
def __init__(self, env, state_dim, action_dim,
memory_capacity=10000, max_steps=None,
roll_out_n_steps=10,
reward_gamma=0.99, reward_scale=1., done_penalty=None,
actor_hidden_size=32, critic_hidden_size=32,
actor_output_act=nn.functional.log_softmax, critic_loss="mse",
actor_lr=0.001, critic_lr=0.001,
optimizer_type="rmsprop", entropy_reg=0.01,
max_grad_norm=0.5, batch_size=100, episodes_before_train=100,
epsilon_start=0.9, epsilon_end=0.01, epsilon_decay=200,
use_cuda=True):
super(A2C, self).__init__(env, state_dim, action_dim,
memory_capacity, max_steps,
reward_gamma, reward_scale, done_penalty,
actor_hidden_size, critic_hidden_size,
actor_output_act, critic_loss,
actor_lr, critic_lr,
optimizer_type, entropy_reg,
max_grad_norm, batch_size, episodes_before_train,
epsilon_start, epsilon_end, epsilon_decay,
use_cuda)
self.roll_out_n_steps = roll_out_n_steps
self.actor = ActorNetwork(self.state_dim, self.actor_hidden_size,
self.action_dim, self.actor_output_act)
self.critic = CriticNetwork(self.state_dim, self.action_dim,
self.critic_hidden_size, 1)
if self.optimizer_type == "adam":
self.actor_optimizer = Adam(self.actor.parameters(), lr=self.actor_lr)
self.critic_optimizer = Adam(self.critic.parameters(), lr=self.critic_lr)
elif self.optimizer_type == "rmsprop":
self.actor_optimizer = RMSprop(self.actor.parameters(), lr=self.actor_lr)
self.critic_optimizer = RMSprop(self.critic.parameters(), lr=self.critic_lr)
if self.use_cuda:
self.actor.cuda()
# agent interact with the environment to collect experience
def interact(self):
super(A2C, self)._take_n_steps()
# train on a roll out batch
def train(self):
if self.n_episodes <= self.episodes_before_train:
pass
batch = self.memory.sample(self.batch_size)
states_var = to_tensor_var(batch.states, self.use_cuda).view(-1, self.state_dim)
one_hot_actions = index_to_one_hot(batch.actions, self.action_dim)
actions_var = to_tensor_var(one_hot_actions, self.use_cuda).view(-1, self.action_dim)
rewards_var = to_tensor_var(batch.rewards, self.use_cuda).view(-1, 1)
# update actor network
self.actor_optimizer.zero_grad()
action_log_probs = self.actor(states_var)
entropy_loss = th.mean(entropy(th.exp(action_log_probs)))
action_log_probs = th.sum(action_log_probs * actions_var, 1)
values = self.critic(states_var, actions_var)
advantages = rewards_var - values.detach()
pg_loss = -th.mean(action_log_probs * advantages)
actor_loss = pg_loss - entropy_loss * self.entropy_reg
actor_loss.backward()
if self.max_grad_norm is not None:
nn.utils.clip_grad_norm(self.actor.parameters(), self.max_grad_norm)
self.actor_optimizer.step()
# update critic network
self.critic_optimizer.zero_grad()
target_values = rewards_var
if self.critic_loss == "huber":
critic_loss = nn.functional.smooth_l1_loss(values, target_values)
else:
critic_loss = nn.MSELoss()(values, target_values)
critic_loss.backward()
if self.max_grad_norm is not None:
nn.utils.clip_grad_norm(self.critic.parameters(), self.max_grad_norm)
self.critic_optimizer.step()
# predict softmax action based on state
def _softmax_action(self, state):
state_var = to_tensor_var([state], self.use_cuda)
softmax_action_var = th.exp(self.actor(state_var))
if self.use_cuda:
softmax_action = softmax_action_var.data.cpu().numpy()[0]
else:
softmax_action = softmax_action_var.data.numpy()[0]
return softmax_action
# choose an action based on state with random noise added for exploration in training
def exploration_action(self, state):
softmax_action = self._softmax_action(state)
epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
np.exp(-1. * self.n_steps / self.epsilon_decay)
if np.random.rand() < epsilon:
action = np.random.choice(self.action_dim)
else:
action = np.argmax(softmax_action)
return action
# choose an action based on state for execution
def action(self, state):
softmax_action = self._softmax_action(state)
action = np.argmax(softmax_action)
return action
# evaluate value for a state-action pair
def value(self, state, action):
state_var = to_tensor_var([state], self.use_cuda)
action = index_to_one_hot(action, self.action_dim)
action_var = to_tensor_var([action], self.use_cuda)
value_var = self.critic(state_var, action_var)
if self.use_cuda:
value = value_var.data.cpu().numpy()[0]
else:
value = value_var.data.numpy()[0]
return value