-
Notifications
You must be signed in to change notification settings - Fork 0
/
Agent.py
88 lines (61 loc) · 3.17 KB
/
Agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from collections import namedtuple, deque
import random
lr = 2e-4
wd = 1e-3
lamb = 1e-2
gamma = 0.99
tau = 1e-3
buffer_size = int(1e5)
batch_size = 256
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
seed = 0
update_every = 16
class Agent():
def __init__(self, batch_size, buffer_size, state_size, action_size, hidden_layer, hidden2_layer):
self.seed = torch.manual_seed(seed)
self.Actor = Actor(state_size, action_size, hidden_layer, hidden2_layer, seed)
self.Actor_target = Actor(state_size, action_size, hidden_layer, hidden2_layer, seed)
self.Critic = Critic(state_size, action_size, hidden_layer, hidden2_layer, seed)
self.Critic_target = Critic(state_size, action_size, hidden_layer, hidden2_layer, seed)
self.actor_optimizer = optim.Adam(self.Actor.parameters(), lr=lr, weight_decay=wd)
self.critic_optimizer = optim.Adam(self.Critic.parameters(), lr=lr, weight_decay=wd)
self.memory = ReplayBuffer(buffer_size, batch_size)
self.batch_size = batch_size
self.buffer_size = buffer_size
self.t_step = 0
def act(self, states):
states = torch.from_numpy(states).float().to(device)
self.Actor.eval()
with torch.no_grad():
actions = self.Actor(states).cpu().data.numpy()
self.Actor.train()
return actions
def step(self, states, actions, rewards, next_states, dones):
self.memory.add(states, actions, rewards, next_states, dones)
self.t_step = (self.t_step + 1) % update_every
if self.t_step == 0:
if len(self.memory) > batch_size:
experience = self.memory.sample()
Agent.learn(self, experiences=experience, gamma=0.99)
def learn(self, experiences, gamma):
states,actions,rewards,next_states,dones = experiences
#--------------------------critic update ----------------------------------------#
actor_target = self.Actor_target(next_states)
Q_target = self.Critic_target(next_states, actor_target)
critic_local = self.Critic(states, actions)
critic_target = rewards + (gamma * Q_target * (1-dones))
critic_loss = F.mse_loss(critic_local, critic_target)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
#--------------------------actor update -----------------------------------------#
actor_local = self.Actor(states)
actor_loss = -self.Critic(states, actor_local).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
def soft_update(self, actor_local, actor_target, critic_local, critic_target):
for local, target in zip(actor_local, actor_target):
actor_target.data.copy_(tau*actor_local + (1-tau)*actor_target)
for local, target in zip(critic_local, critic_target):
critic_target.data.copy_(tau*critic_local + (1-tau)*critic_target)