0
点赞
收藏
分享

微信扫一扫

Spring Boot实现定时任务

前行的跋涉者 2024-08-20 阅读 15

基于pytorch的resnet实验:鸟类分类

Ⅰ Ⅰ Introduction:
  • 本文为机器学习使用resnet实现鸟类图片分类的实验,素材来自网络。
  • 学习目标:
    • 学习和理解resnetV和v2的区别
    • 基于tensorflow代码写出pytroch版本并跑通
Ⅱ Ⅱ Experiment:
  1. 数据准备与任务分析:
    数据通过网络下载完成
    resnetV2介绍与v1差别:
    ResNetV2 与 ResNetV1 的区别
  2. 残差块的设计
    ResNetV1: ResNetV1 的残差块是先进行卷积运算,然后再通过批归一化(Batch Normalization)和激活函数(ReLU)。这一设计可能在深层网络中出现梯度消失的问题,尤其在网络深度增加时更为明显。

公式:

ResNetV2: ResNetV2 提出了**预激活(Pre-activation)**的概念,首先对输入进行批归一化和 ReLU 激活,然后再进行卷积运算。这样可以缓解梯度消失问题,使得信息在反向传播时能更有效地通过残差块。

公式:

  1. 梯度传播的优化
    ResNetV1的梯度更新路径较长,梯度需要通过 ReLU 和卷积层反向传播到前面的层。随着网络加深,梯度衰减可能导致训练困难。

ResNetV2使用了预激活结构,梯度直接通过批归一化和残差连接传播到前面的层,这样可以更好地保持梯度流动,特别是在非常深的网络中性能表现优越。

  1. 性能差异
    ResNetV1在初期的实验中表现优异,能够训练非常深的网络并取得出色的性能,但其在非常深的网络(如超过50层)时,梯度消失问题依然存在。

ResNetV2在同样的深度下比 ResNetV1 更稳定,尤其是在更深的层数下(如 ResNet-101、ResNet-152 等),性能更优,梯度更加平滑。

实验总结
实现了 ResNetV2 中的 Residual Block
使用了预激活(Pre-activation)的残差块设计,通过在卷积操作前应用Batch Normalization和ReLU 激活,确保梯度更好地传播。
实现了卷积的**捷径(shortcut)**路径,通过 1x1 卷积进行维度匹配,确保输入和输出之间的通道和尺寸一致。
实现了完整的 ResNet50V2 架构
ResNet50V2 包含五个大的卷积层组(conv1 至 conv5),通过残差块进行堆叠,并通过 Stack2 来组合多个残差块形成网络的深度。
实现了可选的顶层池化和分类层,通过配置参数可选择是否使用全连接层(即分类层)或使用全局池化(average pooling 或 max pooling)。
实验要点
本次实现的 ResNetV2 结构通过残差连接解决了深层网络中的梯度消失问题,并使用预激活设计来优化梯度流动。相比 ResNetV1,V2 的设计在较深层次的模型上更加稳定。
模型可以配置是否包含顶层(全连接层),这使得该模型不仅适用于分类任务,还可以灵活地应用于其他计算机视觉任务,如特征提取、迁移学习等。

  1. 配置环境:
    语言环境:python 3.8
    编译器: pycharm
    深度学习环境:
    torch2.11
    cuda12.1
    torchvision
    0.15.2a0
    导入一切需要的包:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision
from torchvision import transforms, datasets
import os, PIL, pathlib, warnings
import torch.nn.functional as F
import matplotlib.pyplot as plt
import pandas as pd
from torchvision.io import read_image
from torch.utils.data import Dataset
import torch.utils.data as data
from PIL import Image
import copy
import numpy as np
  1. 构建网络:
    为了提高模型性能,选择输入为3通道,经过4层卷积2层池化以及两层全连接输出最终结果,同时训练中加入BN与dropout方法。
class Block2(nn.Module):
def __init__(self, in_channel, filters, kernel_size=3, stride=1, conv_shortcut=False):
super(Block2, self).__init__()

# 预激活:BN + ReLU
self.preact = nn.Sequential(
nn.BatchNorm2d(in_channel),
nn.ReLU(inplace=True)
)

# Shortcut(捷径连接)部分
self.shortcut = conv_shortcut
if self.shortcut:
# 如果conv_shortcut为True,则使用1x1卷积调整输入通道和输出通道的一致性
self.short = nn.Conv2d(in_channel, 4 * filters, kernel_size=1, stride=stride, bias=False)
elif stride > 1:
# 如果需要降采样且没有捷径连接,使用MaxPool2d降采样
self.short = nn.MaxPool2d(kernel_size=1, stride=stride, padding=0)
else:
# 否则直接使用Identity保持输入不变
self.short = nn.Identity()

# 残差块的三层卷积
self.conv1 = nn.Sequential(
nn.Conv2d(in_channel, filters, kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(filters),
nn.ReLU(inplace=True)
)
self.conv2 = nn.Sequential(
nn.Conv2d(filters, filters, kernel_size=kernel_size, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(filters),
nn.ReLU(inplace=True)
)
self.conv3 = nn.Conv2d(filters, 4 * filters, kernel_size=1, stride=1, bias=False)

def forward(self, x):
# 预激活处理输入
x1 = self.preact(x)

# 处理捷径连接
if self.shortcut:
x2 = self.short(x1)
else:
x2 = self.short(x)

# 残差路径
x1 = self.conv1(x1)
x1 = self.conv2(x1)
x1 = self.conv3(x1)

# 残差连接:输入加上残差路径
x = x1 + x2

return x


class ResNet50V2(nn.Module):
def __init__(self,
include_top=True, # 是否包含位于网络顶部的全连接层
preact=True, # 是否使用预激活
use_bias=False, # 是否对卷积层使用偏置
input_shape=[224, 224, 3], # 输入的图像大小
classes=1000, # 用于分类的类数量
pooling=None): # 全局池化类型,可选 "avg" 或 "max"
super(ResNet50V2, self).__init__()

# 第一层卷积 + 最大池化
self.conv1 = nn.Sequential()
self.conv1.add_module('conv', nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=use_bias))

# 如果不使用预激活,则在conv1之后加BN和ReLU
if not preact:
self.conv1.add_module('bn', nn.BatchNorm2d(64))
self.conv1.add_module('relu', nn.ReLU(inplace=True))

self.conv1.add_module('max_pool', nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

# 残差堆栈 (Stack of residual blocks)
self.conv2 = Stack2(64, 64, 3, stride=1)
self.conv3 = Stack2(256, 128, 4, stride=2)
self.conv4 = Stack2(512, 256, 6, stride=2)
self.conv5 = Stack2(1024, 512, 3, stride=2)

# 后处理部分
self.post = nn.Sequential()
if preact:
# 使用预激活时,最终加入一个BN + ReLU
self.post.add_module('bn', nn.BatchNorm2d(2048))
self.post.add_module('relu', nn.ReLU(inplace=True))

# 是否包含全连接层
if include_top:
self.post.add_module('avg_pool', nn.AdaptiveAvgPool2d((1, 1)))
self.post.add_module('flatten', nn.Flatten())
self.post.add_module('fc', nn.Linear(2048, classes))
else:
# 可选全局池化层
if pooling == 'avg':
self.post.add_module('avg_pool', nn.AdaptiveAvgPool2d((1, 1)))
elif pooling == 'max':
self.post.add_module('max_pool', nn.AdaptiveMaxPool2d((1, 1)))

def forward(self, x):
# 前向传播
x = self.conv1(x) # 初始卷积层
x = self.conv2(x) # 第一残差块
x = self.conv3(x) # 第二残差块
x = self.conv4(x) # 第三残差块
x = self.conv5(x) # 第四残差块
x = self.post(x) # 后处理
return x

class Stack2(nn.Module):
def __init__(self, in_channels, filters, blocks, stride=1):
super(Stack2, self).__init__()

# 第一个Block使用步幅进行降采样
self.blocks = nn.Sequential()
self.blocks.add_module('block_0', Block2(in_channels, filters, stride=stride, conv_shortcut=True))

# 其余Block保持输入大小不变
for i in range(1, blocks):
self.blocks.add_module(f'block_{i}', Block2(4 * filters, filters, stride=1, conv_shortcut=False))

def forward(self, x):
return self.blocks(x)
  1. 训练模型:
    模型的损失函数选用交叉熵,通过以下代码对模型进行更新:
def train(dataloader, model, optimizer, loss_fn, device):
"""训练模型的一个epoch。"""
size = len(dataloader.dataset)
num_batches = len(dataloader)
train_acc, train_loss = 0, 0

for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
loss = loss_fn(pred, y)

optimizer.zero_grad()
loss.backward()
optimizer.step()

train_loss += loss.item()
train_acc += (pred.argmax(1) == y).type(torch.float).sum().item()

train_loss /= num_batches
train_acc /= size

return train_acc, train_loss
  1. 测试模型:
    通过以下代码完成评估:
def test(dataloader, model, loss_fn, device):
"""测试模型的性能。"""
size = len(dataloader.dataset)
num_batches = len(dataloader)
test_loss, test_acc = 0, 0

with torch.no_grad():
for imgs, target in dataloader:
imgs, target = imgs.to(device), target.to(device)
target_pred = model(imgs)
loss = loss_fn(target_pred, target)
test_loss += loss.item()
test_acc += (target_pred.argmax(1) == target).type(torch.float).sum().item()

test_acc /= size
test_loss /= num_batches

return test_acc, test_loss
  1. 实验结果及可视化:
    主函数训练代码即绘制图像执行如下:
if __name__ == "__main__":
# 设置设备
device = set_device()

# 配置matplotlib
configure_plot()

# 数据路径
data_dir = '/content/drive/MyDrive/J1/bird_photos'
data_dir = pathlib.Path(data_dir)

# 统计图片数量
image_count = count_images(data_dir)
print("图片总数为:", image_count)

# 获取类别名称
data_paths = list(data_dir.glob('*'))
classNames = [str(path).split('/')[-1] for path in data_paths]
print("类别名称:", classNames)

# 数据预处理
train_transforms = transforms.Compose([
transforms.Resize([224, 224]),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
transforms.Resize([224, 224]),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 加载数据
train_dataset, test_dataset, total_data = load_data(data_dir, train_transforms)

# 创建数据加载器
batch_size = 8
train_dl, test_dl = create_data_loaders(train_dataset, test_dataset, batch_size)

# 可视化部分图片
visualize_sample_images('/content/drive/MyDrive/J1/bird_photos/Black Skimmer/')

# 定义ResNet模型
model = ResNet50(block=ResNetblock, num_classes=len(classNames)).to(device)
print(model)

# 设置损失函数和优化器
loss_fn = nn.CrossEntropyLoss()
learn_rate = 1e-3
opt = torch.optim.Adam(model.parameters(), lr=learn_rate)

# 训练模型
epochs = 20
train_loss, train_acc, test_loss, test_acc = [], [], [], []
best_acc = 0

for epoch in range(epochs):
model.train()
epoch_train_acc, epoch_train_loss = train(train_dl, model, opt, loss_fn, device)
model.eval()
epoch_test_acc, epoch_test_loss = test(test_dl, model, loss_fn, device)

if epoch_test_acc > best_acc:
best_acc = epoch_test_acc
best_model = copy.deepcopy(model)

train_acc.append(epoch_train_acc)
train_loss.append(epoch_train_loss)
test_acc.append(epoch_test_acc)
test_loss.append(epoch_test_loss)

lr = opt.state_dict()['param_groups'][0]['lr']

template = ('Epoch:{:2d}, Train_acc:{:.1f}%, Train_loss:{:.3f}, Test_acc:{:.1f}%, Test_loss:{:.3f}, Lr:{:.2E}')
print(template.format(epoch + 1, epoch_train_acc * 100, epoch_train_loss,
epoch_test_acc * 100, epoch_test_loss, lr))

# 绘制结果
plot_results(epochs, train_acc, test_acc, train_loss, test_loss)

print('训练完成')

在这里插入图片描述

Ⅲ Ⅲ Conclusion:

通过本次实验,我们深入理解了 ResNetV2 相较于 ResNetV1 的改进之处,尤其是预激活的设计如何提升深层网络的梯度传播效率。实验中成功地实现了 ResNet50V2 的网络架构,展示了如何通过残差块的堆叠来构建深度卷积神经网络。

对于今后的任务,可以考虑在更深层的网络上使用 ResNetV2 以提高稳定性,特别是在需要更深层结构(如 ResNet-101、ResNet-152)时,V2 的优势会更加明显。

举报

相关推荐

0 条评论