RNN-BPTT算法 in Numpy

RNN 中的BPTT算法

注:统一使用”*”表示element wise乘法,使用表示矩阵乘法。

公式推导

rnn-1

有上图可知,

根据以上的推导,可以得到U,W参数对应的局部梯度:

补充知识:

根据交叉熵损失函数的梯度公式可得:

根据sigmoid激活函数的梯度公式可得:

从而,上式(1.4)可写成:

式(1.5)可写成:

式(1.6)可写成:

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def bptt(self, x, y):
'''
x: sequence
y: label
'''
bptt_truncate = 10 #表示最多只往回看10 time step
T = len(y)
o,s = self.forward_propagation(x) #通过前向传播得到o=[o1,o2,...ot,...on],s=[s1,s2,...,st,...,sn]
#u,v,w三个参数的梯度初始化为0
dLdU = np.zeros(self.U.shape)
dLdV = np.zeros(self.V.shape)
dLdW = np.zeros(self.W.shape)
#(o-y):softmax交叉熵的梯度值
delta_o = o[np.arange(len(y)),y]-1.

for t in np.arange(T)[::-1]:
dLdV += np.outer(delta_o[t],s[t].T) #式(1.9)

for t in np.arange(T)[::-1]:
delta_t = self.V.T.dot(delta_o[t])*(s[t]*(1-s[t])) #式(2.0),即式(1.8)中k=t时
for bptt_step in np.arange(max(0,t-bptt_truncate),t+1)[::-1]:
'''式(1.8)'''
dLdW += np.outer(delta_t,s[bptt_step-1])
delta_t = self.W.T.dot(delta_t)*(s[btpp_step-1]*(1-s[bptt_step-1]))#式(2.1),式(1.8)中k!=t时

delta_t = self.V.T.dot(delta_o[t])*(s[t]*(1-s[t])) #式(2.0),即式(1.7)中k=t时
for bptt_step in np.arange(max(0,t-bptt_truncate),t+1)[::-1]:
'''式(1.7)'''
dLdU += np.outer(delta_t, x[bptt_step])
delta_t = self.W.T.dot(delta_t)*(s[btpp_step-1]*(1-s[bptt_step-1]))#式(2.1),式(1.7)中k!=t时

return [dLdU,dLdV,dLdW]

完整的RNN类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class RNNNumpy(object):
def __init__(self, word_dim, hidden_dim=100,bptt_truncate=4):
'''
word_dim:vocabulary size
hidden_dim:隐藏状态的维度
bptt_truncate:最多往回看4 time step(在上面的实现中设置为10)
'''
self.word_dim = word_dim
self.hidden_dim = hidden_dim
self.bptt_truncate = bptt_truncate

#Randomly initialize the network parameters,http://proceedings.mlr.press/v9/glorot10a/glorot10a.pdf
self.U = np.random.uniform(-np.sqrt(1./word_dim), np.sqrt(1./word_dim),(hidden_dim,word_dim))
self.V = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim),(word_dim,hidden_dim))
self.W = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden-dim),(hidden_dim,hidden_dim))

def forward_propagation(self, x):
#the total number of time steps
T = len(x)
#During forward propagation we save all hidden states in s
s = np.zeros((T+1), self.hidden_dim) #time step -1's hidden states
#the outputs at each time step
o = np.zeros((T, self.word_dim))
#for each time step
for t in np.arange(T):
#Note that we are indexing U by x[t].
s[t] = np.tanh(self.U[:,x[t]]+self.W.dot(s[t-1]))
o[t] = softmax(self.V.dot(s[t]))

def predict(self, x):
#perform forward propagation and return index of highest score
o,s = self.forward_propagation(x)
return np.argmax(o, axis=1)

def calculate_total_loss(self, x, y):
'''
计算数据集x的交叉熵
'''
L = 0
for i in np.arange(len(y)):
print("%d-th sentence" % (i))
o,s = self.forward_propagation(x[i])
#we only care about our prediction of the "correct" words
#可参考:https://blog.csdn.net/u014380165/article/details/77284921
correct_word_predictions = o[np.arange(len(y[i])), y[i]]
#Add to the loss based on how off we were,把所有的output的损失累加起来
L += -1*np.sum(np.log(correct_word_predictions))
return L

def calculate_loss(self, x, y):
#Divide the total loss by the number of training examples
N = np.sum((len(y_i) for y_i in y)) #y数据集中总的单词数
return self.calculate_total_loss(x,y)/N

def bptt(self, x, y):
'''
x: one sequence
y: label
'''
bptt_truncate = 10 #表示最多只往回看10 time step
T = len(y)
o,s = self.forward_propagation(x) #通过前向传播得到o=[o1,o2,...ot,...on],s=[s1,s2,...,st,...,sn]
#u,v,w三个参数的梯度初始化为0
dLdU = np.zeros(self.U.shape)
dLdV = np.zeros(self.V.shape)
dLdW = np.zeros(self.W.shape)
#(o-y):softmax交叉熵的梯度值
delta_o = o[np.arange(len(y)),y]-1.

for t in np.arange(T)[::-1]:
dLdV += np.outer(delta_o[t],s[t].T) #式(1.9)

for t in np.arange(T)[::-1]:
delta_t = self.V.T.dot(delta_o[t])*(s[t]*(1-s[t])) #式(2.0),即式(1.8)中k=t时
for bptt_step in np.arange(max(0,t-bptt_truncate),t+1)[::-1]:
'''式(1.8)'''
dLdW += np.outer(delta_t,s[bptt_step-1])
delta_t = self.W.T.dot(delta_t)*(s[btpp_step-1]*(1-s[bptt_step-1]))#式(2.1),式(1.8)中k!=t时

delta_t = self.V.T.dot(delta_o[t])*(s[t]*(1-s[t])) #式(2.0),即式(1.7)中k=t时
for bptt_step in np.arange(max(0,t-bptt_truncate),t+1)[::-1]:
'''式(1.7)'''
dLdU += np.outer(delta_t, x[bptt_step])
delta_t = self.W.T.dot(delta_t)*(s[btpp_step-1]*(1-s[bptt_step-1]))#式(2.1),式(1.7)中k!=t时
return [dLdU,dLdV,dLdW]

def gradient_check(self, x, y, h=0.001, error_threshold=0.01):
'''
使用梯度公式计算的梯度来检验使用bptt得到的梯度是否正确
x: one sentence
y: label
'''
bptt_gradients = self.bptt(x, y)
#list of all parameters we want to check
model_parameters = ['U','V','W']
#Gradient check for each parameter
for pidx, pname in enumerate(model_parameters):
#Get the actual parameter value from the mode.
parameter = operator.attrgetter(pname)(self)
print("Performing gradient check for parameter %s with size %d" % (pname, np.prod(parameter.shape)))

#Iterate over each element of the parameter matrix
it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite'])
while not it.finished:
ix = it.multi_index
#Save the original value so we can reset it later
original_value = parameter[ix]
#Estimate the gradient using (f(x+h)-f(x-h))/(2*h),分别对参数中的每个分量应用梯度公式
#f(x+h)
parameter[ix] = original_value+h
gradplus = self.calculate_total_loss([x],[y])
#f(x-h)
parameter[ix] = original_value-h
gradminus = self.calculate_total_loss([x],[y])
#(f(x+h)-f(x-h))/(2*h)
estimated_gradient = (gradplus-gradminus)/(2*h)
#reset parameter to original_value
parameter[ix] = original_value
#The gradient for this parameter calculated using backpropagation
backprop_gradient = bptt_gradients[pidx][ix]
#calculate the relative error:(|x-y|/(|x|-|y|))
relative_error = np.abs(backprop_gradient-estimated_gradient)/(np.abs(backprob_gradient)+np.abs(estimated_gradient))
#if the error is to large fail the gradient check
if relative_error>error_threshold:
print("Gradient Check ERROR: parameter=%s ix=%s" % (pname,ix))
print("+h Loss: %f" % gradplus)
print("-h Loss: %f" % gradminus)
print("Estimated_gradient: %f" % estimated_gradient)
print("Backpropagation gradient: %f" % backprop_gradient)
print("Relative Error: %f" % (relative_error))
return
it.iternext()
print("Gradient check for parameter %s passed" % (pname))

def numpy_sdg_step(self, x, y, learning_rate):
'''Calculate the gradients'''
dLdU,dLdV,dLdW = self.bptt(x, y)
#Change parameters according to gradients and learning rate
self.U -= learning_rate * dLdU
self.V -= learning_rate * dLdV
self.W -= learning_rate * dLdW

def train_with_sgd(model, X_train, y_train, learning_rate=0.005, n_epoch=10, evaluate_loss_after=2):
#we keep track of the losses so we can plot them later
losses = []
num_examples_seen = 0
for epoch in range(n_epoch):
if (epoch%evaluate_loss_after==0):
loss = model.calculate_loss(X_train, y_train)
losses.append((num_examples_seen, loss))
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("Loss after num_examples_seen=%d epoch=%d:%f" % (time,num_examples_seen,epoch,loss))

#Adjust the learning rate if loss increases
if(len(losses)>1 and losses[-1][1]>losses[-2][1]):
learning_rate = learning_rate*0.5
print("Setting learning rate to %f" % learning_rate)
sys.stdout.flush()

#for each training example
for i in range(len(y_train)):
model.sgd_step(X_train[i], y_train[i], learning_rate)
num_examples_seen += 1

参考文献