0
点赞
收藏
分享

微信扫一扫

从零实现深度学习框架——GRU从理论到实战【实战】


引言

本着“凡我不能创造的,我就不能理解”的思想,​​本系列​​文章会基于纯Python以及NumPy从零创建自己的深度学习框架,该框架类似PyTorch能实现自动求导。

要深入理解深度学习,从零开始创建的经验非常重要,从自己可以理解的角度出发,尽量不使用外部完备的框架前提下,实现我们想要的模型。​​本系列​​文章的宗旨就是通过这样的过程,让大家切实掌握深度学习底层实现,而不是仅做一个调包侠。

​​上篇文章​​​中,我们学习了LSTM的理论部分。并且在​​RNN的实战部分​​,我们看到了如何实现多层RNN和双向RNN。同样,这里实现的LSTM也支持多层和双向。

GRUCell

class GRUCell(Module):
def __init__(self, input_size, hidden_size: int, bias: bool = False) -> None:
super(GRUCell, self).__init__()
# 组合了 x->reset gate; x->update gate; x->g 的线性转换
self.input_trans = Linear(hidden_size, 3 * hidden_size, bias=bias)
# 组合了 h->reset gate; h->update gate; h->g 的线性转换
self.hidden_trans = Linear(input_size, 3 * hidden_size, bias=bias)

def forward(self, x: Tensor, h: Tensor) -> Tensor:
# r: rest gate
# z: output gate
# g: g_t
input_trans = self.input_trans(x)
hidden_trans = self.hidden_trans(h)

i_r, i_z, i_g = F.chunk(input_trans, 3, -1)
h_r, h_z, h_g = F.chunk(hidden_trans, 3, -1)

r = F.sigmoid(i_r + h_r) # 重置门
z = F.sigmoid(i_z + h_z) # 更新门

h_next = z * h + (1 - z) * F.tanh(i_g + r * h_g) # g = i_g + r * h_g 候选状态 = tanh(g)

return

同样,对于类似的线性转换我们拼接在一起计算。

这里引入的​​g​​​就是公式从零实现深度学习框架——GRU从理论到实战【实战】_词性标注从零实现深度学习框架——GRU从理论到实战【实战】_gru_02里的参数。

GRU

class GRU(Module):
def __init__(self, input_size: int, hidden_size: int, batch_first: bool = False, num_layers: int = 1,
bidirectional: bool = False, dropout: float = 0):
super(GRU, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.batch_first = batch_first
self.bidirectional = bidirectional

# 支持多层
self.cells = ModuleList([GRUCell(input_size, hidden_size)] +
[GRUCell(hidden_size, hidden_size) for _ in range(num_layers - 1)])

if self.bidirectional:
# 支持双向
self.back_cells = copy.deepcopy(self.cells)

self.dropout = dropout
if dropout:
# Dropout层
self.dropout_layer = Dropout(dropout)

def _one_directional_op(self, input, cells, n_steps, hs, reverse=False):
'''

Args:
input: 输入 [n_steps, batch_size, input_size]
cells: 正向或反向RNNCell的ModuleList
hs:
n_steps: 步长
reverse: true 反向

Returns:

'''
output = []
for t in range(n_steps):
inp = input[t]

for layer in range(self.num_layers):
hs[layer] = cells[layer](inp, hs[layer])
inp = hs[layer]
if self.dropout and layer != self.num_layers - 1:
inp = self.dropout_layer(inp)

# 收集最终层的输出
output.append(hs[-1])

output = F.stack(output) # (n_steps, batch_size, num_directions * hidden_size)

if reverse:
output = F.flip(output, 0) # 将输出时间步维度逆序,使得时间步t=0上,是看了整个序列的结果。

if self.batch_first:
output = output.transpose((1, 0, 2))

h_n = F.stack(hs)

return output, h_n

def forward(self, input: Tensor, h_0: Tensor) -> Tuple[Tensor, Tensor]:
'''
RNN的前向传播
:param input: 形状 [n_steps, batch_size, input_size] 若batch_first=False
:param h_0: 形状 [num_layers, batch_size, hidden_size]
:return:
num_directions = 2 if self.bidirectional else 1

output: (n_steps, batch_size, num_directions * hidden_size)若batch_first=False 或
(batch_size, n_steps, num_directions * hidden_size)若batch_first=True
包含每个时间步最后一层(多层RNN)的输出h_t
h_n: (num_directions * num_layers, batch_size, hidden_size) 包含最终隐藏状态
'''

is_batched = input.ndim == 3
batch_dim = 0 if self.batch_first else 1
if not is_batched:
# 转换为批大小为1的输入
input = input.unsqueeze(batch_dim)
if h_0 is not None:
h_0 = h_0.unsqueeze(1)
if self.batch_first:
batch_size, n_steps, _ = input.shape
input = input.transpose((1, 0, 2)) # 将batch放到中间维度
else:
n_steps, batch_size, _ = input.shape

if h_0 is None:
num_directions = 2 if self.bidirectional else 1
h = Tensor.zeros((self.num_layers * num_directions, batch_size, self.hidden_size), dtype=input.dtype,
device=input.device)
else:
h = h_0

hs = list(F.unbind(h)) # 按层数拆分h

if not self.bidirectional:
# 如果是单向的
output, h_n = self._one_directional_op(input, self.cells, n_steps, hs)
else:
output_f, h_n_f = self._one_directional_op(input, self.cells, n_steps, hs[:self.num_layers])

output_b, h_n_b = self._one_directional_op(F.flip(input, 0), self.back_cells, n_steps, hs[self.num_layers:], reverse=True)

output = F.cat([output_f, output_b], 2)
h_n = F.cat([h_n_f, h_n_b], 0)

return output,

可以看到,GRU的实现和RNN非常类似,有很多重复代码。它们之间的主要区别在于Cell的实现不同。因此,最终代码里面,我们将这三者合并在一起,减少代码量和维护难度。具体参考完整代码。

词性标注实战

基于我们上面实现的GRU来实现该词性标注分类模型,这里同样也叫GRU:

class GRU(nn.Module):
def __init__(self, vocab_size: int, embedding_dim: int, hidden_dim: int, output_dim: int, n_layers: int,
dropout: float, bidirectional: bool = False):
super(GRU, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.rnn = nn.GRU(embedding_dim, hidden_dim, batch_first=True, num_layers=n_layers, dropout=dropout,
bidirectional=bidirectional)

num_directions = 2 if bidirectional else 1
self.output = nn.Linear(num_directions * hidden_dim, output_dim)

def forward(self, input: Tensor, hidden: Tensor = None) -> Tensor:
embeded = self.embedding(input)
output, _ = self.rnn(embeded, hidden) # pos tag任务利用的是包含所有时间步的output
outputs = self.output(output)
log_probs = F.log_softmax(outputs, axis=-1)
return

过程和RNN中的类似,训练代码如下:

embedding_dim = 128
hidden_dim = 128
batch_size = 32
num_epoch = 10
n_layers = 2
dropout = 0.2

# 加载数据
train_data, test_data, vocab, pos_vocab = load_treebank()
train_dataset = RNNDataset(train_data)
test_dataset = RNNDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=train_dataset.collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=test_dataset.collate_fn, shuffle=False)

num_class = len(pos_vocab)

# 加载模型
device = cuda.get_device("cuda:0" if cuda.is_available() else "cpu")
model = GRU(len(vocab), embedding_dim, hidden_dim, num_class, n_layers, dropout, bidirectional=True)
model.to(device)

# 训练过程
nll_loss = NLLLoss()
optimizer = SGD(model.parameters(), lr=0.1)

model.train() # 确保应用了dropout
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, targets, mask = [x.to(device) for x in batch]
log_probs = model(inputs)
loss = nll_loss(log_probs[mask], targets[mask]) # 通过bool选择,mask部分不需要计算
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")

# 测试过程
acc = 0
total = 0
model.eval() # 不需要dropout
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, targets, mask = [x.to(device) for x in batch]
with no_grad():
output = model(inputs)
acc += (output.argmax(axis=-1).data == targets.data)[mask.data].sum().item()
total += mask.sum().item()

# 输出在测试集上的准确率
print(f"Acc: {acc / total:.2f}")

输出:

Loss: 104.13
Acc: 0.70

同样的配置,可以看到准确率和LSTM一样。

这里为了演示,只训练了10个批次。

References

  1. ​​从RNN到LSTM再到GRU​​
  2. DIVE INTO DEEP LEARNING


举报

相关推荐

0 条评论