forked from sherjilozair/dqn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdqn.py
131 lines (102 loc) · 4.08 KB
/
dqn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import random
import numpy
from keras.models import Model
from keras.layers import Convolution2D, Dense, Flatten, Input, merge
from keras.optimizers import RMSprop
from keras import backend as K
from theano import printing
from theano.gradient import disconnected_grad
class Agent:
def __init__(self, state_size=None, number_of_actions=1,
epsilon=0.1, mbsz=32, discount=0.9, memory=50,
save_name='basic', save_freq=10):
self.state_size = state_size
self.number_of_actions = number_of_actions
self.epsilon = epsilon
self.mbsz = mbsz
self.discount = discount
self.memory = memory
self.save_name = save_name
self.states = []
self.actions = []
self.rewards = []
self.experience = []
self.i = 1
self.save_freq = save_freq
self.build_functions()
def build_model(self):
S = Input(shape=self.state_size)
h = Convolution2D(16, 8, 8, subsample=(4, 4),
border_mode='same', activation='relu')(S)
h = Convolution2D(32, 4, 4, subsample=(2, 2),
border_mode='same', activation='relu')(h)
h = Flatten()(h)
h = Dense(256, activation='relu')(h)
V = Dense(self.number_of_actions)(h)
self.model = Model(S, V)
try:
self.model.load_weights('{}.h5'.format(self.save_name))
print "loading from {}.h5".format(self.save_name)
except:
print "Training a new model"
def build_functions(self):
S = Input(shape=self.state_size)
NS = Input(shape=self.state_size)
A = Input(shape=(1,), dtype='int32')
R = Input(shape=(1,), dtype='float32')
T = Input(shape=(1,), dtype='int32')
self.build_model()
self.value_fn = K.function([S], self.model(S))
VS = self.model(S)
VNS = disconnected_grad(self.model(NS))
future_value = (1-T) * VNS.max(axis=1, keepdims=True)
discounted_future_value = self.discount * future_value
target = R + discounted_future_value
cost = ((VS[:, A] - target)**2).mean()
opt = RMSprop(0.0001)
params = self.model.trainable_weights
updates = opt.get_updates(params, [], cost)
self.train_fn = K.function([S, NS, A, R, T], cost, updates=updates)
def new_episode(self):
self.states.append([])
self.actions.append([])
self.rewards.append([])
self.states = self.states[-self.memory:]
self.actions = self.actions[-self.memory:]
self.rewards = self.rewards[-self.memory:]
self.i += 1
if self.i % self.save_freq == 0:
self.model.save_weights('{}.h5'.format(self.save_name), True)
def end_episode(self):
pass
def act(self, state):
self.states[-1].append(state)
values = self.value_fn([state[None, :]])
if numpy.random.random() < self.epsilon:
action = numpy.random.randint(self.number_of_actions)
else:
action = values.argmax()
self.actions[-1].append(action)
return action, values
def observe(self, reward):
self.rewards[-1].append(reward)
return self.iterate()
def iterate(self):
N = len(self.states)
S = numpy.zeros((self.mbsz,) + self.state_size)
NS = numpy.zeros((self.mbsz,) + self.state_size)
A = numpy.zeros((self.mbsz, 1), dtype=numpy.int32)
R = numpy.zeros((self.mbsz, 1), dtype=numpy.float32)
T = numpy.zeros((self.mbsz, 1), dtype=numpy.int32)
for i in xrange(self.mbsz):
episode = random.randint(max(0, N-50), N-1)
num_frames = len(self.states[episode])
frame = random.randint(0, num_frames-1)
S[i] = self.states[episode][frame]
T[i] = 1 if frame == num_frames - 1 else 0
if frame < num_frames - 1:
NS[i] = self.states[episode][frame+1]
A[i] = self.actions[episode][frame]
R[i] = self.rewards[episode][frame]
cost = self.train_fn([S, NS, A, R, T])
return cost