-
Notifications
You must be signed in to change notification settings - Fork 2
/
NEAT_Experiment_cartpole.py
135 lines (106 loc) · 3.72 KB
/
NEAT_Experiment_cartpole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python
"""Description:
NEAT works on GYM Cart Pole
Workable version for both CartPole and MountainCar
"""
import os
import gym
import neat
import numpy as np
import visualize
import csv
from datetime import datetime
#hyper paramerters
num_of_steps = 200
num_of_episodes = 1
num_of_generations = 100
test_episodes = 1
time = datetime.now().strftime("%Y%m%d-%H:%M:%S")
is_render = False
#Cart Pole
config_path = 'properties/CartPole-v0/config'
env = gym.make('CartPole-v0')
#Mountain Car
# config_path = 'properties/MountainCar-v0/config'
# env = gym.make('MountainCar-v0')
# Pong-ram-v0
# config_path = 'properties/MountainExtraLongCar-v0/config'
# env = gym.make('MountainExtraLongCar-v0')
print("action space: ", env.action_space)
print("observation space: ", env.observation_space)
generation = 1
def evaluation(genomes, config):
nets = []
for genome_id, genome in genomes:
net = neat.nn.FeedForwardNetwork.create(genome, config)
genome.fitness = do_rollout(net, is_render)
nets.append((genome, net))
# sort the genomes by fitness
nets_sorted = sorted(nets, key=lambda x: x[0].fitness, reverse=True)
# save the best individual's genomes
best_genome, best_net = nets_sorted[0]
global generation
test_best_agent(generation, best_net, is_render)
generation = generation + 1
def do_rollout(agent, render=False):
total_reward = 0
ob = env.reset()
t = 0
for t in range(num_of_steps):
outputs = agent.activate(ob)
a = np.argmax(outputs)
(ob, reward, done, _info) = env.step(a)
total_reward += reward
if render and t % 3 == 0:
env.render()
if done:
break
return total_reward
def test_best_agent(generation_count, net, render=False):
total_steps = []
total_rewards = []
for i in range(test_episodes):
ob = env.reset()
steps = 0
rewards = 0
while True:
output = net.activate(ob)
action = np.argmax(output)
ob, reward, done, info = env.step(action)
rewards += reward
if render and steps % 3 == 0:
env.render()
steps += 1
if done:
break
total_steps.append(steps)
total_rewards.append(rewards)
average_steps_per_episode = np.mean(np.array(total_steps))
average_rewards_per_episode = np.mean(np.array(total_rewards))
# save this to file along with the generation number
entry = [generation_count, average_steps_per_episode, average_rewards_per_episode]
with open(r'results/agent_evaluation-{0}.csv'.format(time), 'a') as file:
writer = csv.writer(file)
writer.writerow(entry)
def run(config):
config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
neat.DefaultSpeciesSet, neat.DefaultStagnation,
config)
# Create the population, which is the top-level object for a NEAT run.
p = neat.Population(config)
stats = neat.StatisticsReporter()
p.add_reporter(stats)
p.add_reporter(neat.StdOutReporter(True))
# Checkpoint every 25 generations or 900 seconds.
# p.add_reporter(neat.Checkpointer(10, 900))
# Add a stdout reporter to show progress in the terminal.
winner = p.run(evaluation, num_of_generations)
# Display the winning genome.
print('\nBest genome:\n{!s}'.format(winner))
# Show output of the most fit genome against training data.
print('\nOutput:')
winner_net = neat.nn.FeedForwardNetwork.create(winner, config)
best_fitness = do_rollout(winner_net, is_render)
print("Test fitness of the best genome: ", best_fitness)
if __name__ == '__main__':
run(config=config_path)