-
Notifications
You must be signed in to change notification settings - Fork 3
/
utils.py
103 lines (87 loc) · 3.79 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import numpy as np
import torch
class ReplayBuffer(object):
"""
Replay buffer for training with RNN
"""
def __init__(self, capacity, observation_shape, action_dim):
self.capacity = capacity
self.observations = np.zeros((capacity, *observation_shape), dtype=np.uint8)
self.actions = np.zeros((capacity, action_dim), dtype=np.float32)
self.rewards = np.zeros((capacity, 1), dtype=np.float32)
self.done = np.zeros((capacity, 1), dtype=np.bool)
self.index = 0
self.is_filled = False
def push(self, observation, action, reward, done):
"""
Add experience to replay buffer
NOTE: observation should be transformed to np.uint8 before push
"""
self.observations[self.index] = observation
self.actions[self.index] = action
self.rewards[self.index] = reward
self.done[self.index] = done
if self.index == self.capacity - 1:
self.is_filled = True
self.index = (self.index + 1) % self.capacity
def sample(self, batch_size, chunk_length):
"""
Sample experiences from replay buffer (almost) uniformly
The resulting array will be of the form (batch_size, chunk_length)
and each batch is consecutive sequence
NOTE: too large chunk_length for the length of episode will cause problems
"""
episode_borders = np.where(self.done)[0]
sampled_indexes = []
for _ in range(batch_size):
cross_border = True
while cross_border:
initial_index = np.random.randint(len(self) - chunk_length + 1)
final_index = initial_index + chunk_length - 1
cross_border = np.logical_and(initial_index <= episode_borders,
episode_borders < final_index).any()
sampled_indexes += list(range(initial_index, final_index + 1))
sampled_observations = self.observations[sampled_indexes].reshape(
batch_size, chunk_length, *self.observations.shape[1:])
sampled_actions = self.actions[sampled_indexes].reshape(
batch_size, chunk_length, self.actions.shape[1])
sampled_rewards = self.rewards[sampled_indexes].reshape(
batch_size, chunk_length, 1)
sampled_done = self.done[sampled_indexes].reshape(
batch_size, chunk_length, 1)
return sampled_observations, sampled_actions, sampled_rewards, sampled_done
def __len__(self):
return self.capacity if self.is_filled else self.index
def preprocess_obs(obs):
"""
conbert image from [0, 255] to [-0.5, 0.5]
"""
obs = obs.astype(np.float32)
normalized_obs = obs / 255.0 - 0.5
return normalized_obs
def lambda_target(rewards, values, gamma, lambda_):
"""
Compute lambda target of value function
rewards and values should be 2D-tensor and same size,
and first-dimension means time step
gamma is discount factor and lambda_ is weight to compute lambda target
"""
V_lambda = torch.zeros_like(rewards, device=rewards.device)
H = rewards.shape[0] - 1
V_n = torch.zeros_like(rewards, device=rewards.device)
V_n[H] = values[H]
for n in range(1, H+1):
# compute n-step target
# NOTE: If it hits the end, compromise with the largest possible n-step return
V_n[:-n] = (gamma ** n) * values[n:]
for k in range(1, n+1):
if k == n:
V_n[:-n] += (gamma ** (n-1)) * rewards[k:]
else:
V_n[:-n] += (gamma ** (k-1)) * rewards[k:-n+k]
# add lambda_ weighted n-step target to compute lambda target
if n == H:
V_lambda += (lambda_ ** (H-1)) * V_n
else:
V_lambda += (1 - lambda_) * (lambda_ ** (n-1)) * V_n
return V_lambda