|
#We first import the necessary libraries and define hyperparameters - |
|
|
|
import gym |
|
import random |
|
import numpy as np |
|
import tflearn |
|
from tflearn.layers.core import input_data, dropout, fully_connected |
|
from tflearn.layers.estimator import regression |
|
from statistics import median, mean |
|
from collections import Counter |
|
|
|
LR = 2.33e-4 |
|
env = gym.make("CartPole-v0") |
|
observation = env.reset() |
|
goal_steps = 500 |
|
score_requirement = 50 |
|
initial_games = 10000 |
|
|
|
#Now we will define a function to generate training data - |
|
|
|
def initial_population(): |
|
# [OBS, MOVES] |
|
training_data = [] |
|
# all scores: |
|
scores = [] |
|
# scores above our threshold: |
|
accepted_scores = [] |
|
# number of episodes |
|
for _ in range(initial_games): |
|
score = 0 |
|
# moves specifically from this episode: |
|
episode_memory = [] |
|
# previous observation that we saw |
|
prev_observation = [] |
|
for _ in range(goal_steps): |
|
# choose random action left or right i.e (0 or 1) |
|
action = random.randrange(0,2) |
|
observation, reward, done, info = env.step(action) |
|
# since that the observation is returned FROM the action |
|
# we store previous observation and corresponding action |
|
if len(prev_observation) > 0 : |
|
episode_memory.append([prev_observation, action]) |
|
prev_observation = observation |
|
score+=reward |
|
if done: break |
|
|
|
# reinforcement methodology here. |
|
# IF our score is higher than our threshold, we save |
|
# all we're doing is reinforcing the score, we're not trying |
|
# to influence the machine in any way as to HOW that score is |
|
# reached. |
|
if score >= score_requirement: |
|
accepted_scores.append(score) |
|
for data in episode_memory: |
|
# convert to one-hot (this is the output layer for our neural network) |
|
if data[1] == 1: |
|
output = [0,1] |
|
elif data[1] == 0: |
|
output = [1,0] |
|
|
|
# saving our training data |
|
training_data.append([data[0], output]) |
|
|
|
# reset env to play again |
|
env.reset() |
|
# save overall scores |
|
scores.append(score) |
|
|
|
# Now using tflearn we will define our neural network |
|
|
|
def neural_network_model(input_size): |
|
|
|
network = input_data(shape=[None, input_size, 1], name='input') |
|
|
|
network = fully_connected(network, 128, activation='relu') |
|
network = dropout(network, 0.8) |
|
|
|
network = fully_connected(network, 256, activation='relu') |
|
network = dropout(network, 0.8) |
|
|
|
network = fully_connected(network, 512, activation='relu') |
|
network = dropout(network, 0.8) |
|
|
|
network = fully_connected(network, 256, activation='relu') |
|
network = dropout(network, 0.8) |
|
|
|
network = fully_connected(network, 128, activation='relu') |
|
network = dropout(network, 0.8) |
|
|
|
network = fully_connected(network, 2, activation='softmax') |
|
network = regression(network, optimizer='adam', learning_rate=LR, loss='categorical_crossentropy', name='targets') |
|
model = tflearn.DNN(network, tensorboard_dir='log') |
|
|
|
return model |
|
|
|
#It is time to train the model now - |
|
|
|
def train_model(training_data, model=False): |
|
|
|
X = np.array([i[0] for i in training_data]).reshape(-1,len(training_data[0][0]),1) |
|
y = [i[1] for i in training_data] |
|
|
|
if not model: |
|
model = neural_network_model(input_size = len(X[0])) |
|
|
|
model.fit({'input': X}, {'targets': y}, n_epoch=5, snapshot_step=500, show_metric=True, run_id='openai_CartPole') |
|
return model |
|
|
|
training_data = initial_population() |
|
|
|
model = train_model(training_data) |
|
|
|
#Training complete, now we should play the game to see how the output looks like |
|
|
|
scores = [] |
|
choices = [] |
|
for each_game in range(10): |
|
score = 0 |
|
game_memory = [] |
|
prev_obs = [] |
|
env.reset() |
|
for _ in range(goal_steps): |
|
env.render() |
|
|
|
if len(prev_obs)==0: |
|
action = random.randrange(0,2) |
|
else: |
|
action = np.argmax(model.predict(prev_obs.reshape(-1,len(prev_obs),1))[0]) |
|
|
|
choices.append(action) |
|
|
|
new_observation, reward, done, info = env.step(action) |
|
prev_obs = new_observation |
|
game_memory.append([new_observation, action]) |
|
score+=reward |
|
if done: break |
|
|
|
scores.append(score) |
|
|
|
print('Average Score:',sum(scores)/len(scores)) |
|
print('choice 1:{} choice 0:{}'.format(float((choices.count(1))/float(len(choices)))*100,float((choices.count(0))/float(len(choices)))*100)) |
|
print(score_requirement) |