导入库
from typing import *
import os
import sys
import random
import pickle
import logging
from datetime import datetime
try:
from tqdm.notebook import tqdm as _tqdm
except:
_tqdm = lambda x : x
import numpy as np
from matplotlib import pyplot as plt
import torch as t
import torch.nn as nn
import torch.optim as optim
import torchvision as tv
from torch.utils.data import DataLoader
准备数据集
data_dir = '.'
batch_size = 256
train_dataset = tv.datasets.MNIST(root=data_dir, train=True, transform=tv.transforms.ToTensor(), download=True)
test_dataset = tv.datasets.MNIST(root=data_dir, train=False, transform=tv.transforms.ToTensor(), download=True)
logging 和 seed
logging.basicConfig(
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO,
stream=sys.stdout,
force=True,
)
_SEED = 0
random.seed(_SEED)
np.random.seed(_SEED)
t.manual_seed(_SEED)
t.cuda.manual_seed_all(_SEED)
assert random.random() == 0.662743203727557, 'different random generator'
assert np.random.rand() == 0.11717829958543136, 'different random generator'
assert t.rand((1,)).item() == 0.997646152973175, 'different random generator'
if t.cuda.is_available():
for i in range(t.cuda.device_count()):
logging.info(t.cuda.get_device_properties(i))
logging.warning('using cuda:0')
device = t.device('cuda:0')
else:
logging.warning('using cpu')
device = t.device('cpu')
def gpu_state(gpu_id, get_return=False):
qargs = ['index', 'gpu_name', 'memory.used', 'memory.total']
cmd = 'nvidia-smi --query-gpu={} --format=csv,noheader'.format(','.join(qargs))
results = os.popen(cmd).readlines()
gpu_id_list = gpu_id.split(",")
gpu_space_available = {}
for cur_state in results:
cur_state = cur_state.strip().split(", ")
for i in gpu_id_list:
if i == cur_state[0]:
if not get_return:
logging.info(f'GPU {i} {cur_state[1]}: Memory-Usage {cur_state[2]} / {cur_state[3]}.')
else:
gpu_space_available[i] = int("".join(list(filter(str.isdigit, cur_state[3])))) - int("".join(list(filter(str.isdigit, cur_state[2]))))
if get_return:
return gpu_space_available
神经网络
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(
in_channels=1, out_channels=16, kernel_size=3,
padding=1, padding_mode='zeros', stride=1, dilation=1,
groups=1, bias=True,
)
self.conv2 = nn.Conv2d(
in_channels=16, out_channels=32, kernel_size=3,
padding=1, padding_mode='circular', stride=1, dilation=1,
groups=4, bias=True,
)
self.pool = nn.MaxPool2d(
kernel_size=2, padding=0, stride=2, dilation=1,
return_indices=False, ceil_mode=False,
)
self.fc1 = nn.Linear(32*7*7, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 10)
self.preluConv1 = nn.PReLU(num_parameters=1, init=0.5)
self.preluConv2 = nn.PReLU(num_parameters=1, init=0.5)
self.preluFC1 = nn.PReLU(num_parameters=1, init=0.5)
self.preluFC2 = nn.PReLU(num_parameters=1, init=0.5)
self.preluFC3 = nn.PReLU(num_parameters=1, init=0.5)
def forward(self, x):
x = x.view(x.size()[0], 1, 28, 28)
x = self.conv1(x)
x = self.preluConv1(x)
x = self.pool(x)
x = self.conv2(x)
x = self.preluConv2(x)
x = self.pool(x)
x = x.view(x.size()[0], 32*7*7)
x = self.fc1(x)
x = self.preluFC1(x)
x = self.fc2(x)
x = self.preluFC2(x)
x = self.fc3(x)
x = self.preluFC3(x)
return x
训练
epochs = 4
batch_size = 100
def _train_test_routine(model:nn.Module, lossFunc:nn.Module, optimizer:optim.Optimizer):
print(model)
print(lossFunc)
print(optimizer)
model.to(device=device, non_blocking=False)
train_loss_seq = []
train_acc_seq = []
test_loss_seq = []
test_acc_seq = []
test_loader = DataLoader(test_dataset, batch_size=batch_size)
train_loader = DataLoader(train_dataset, batch_size=batch_size)
for epoch in range(epochs):
logging.info(f'epoch {epoch:^3d} start')
model.train()
train_acc = 0.0
train_loss = 0.0
for _Xbatch, _Ybatch in train_loader:
Xbatch = _Xbatch.to(device=device, non_blocking=True)
Ybatch = _Ybatch.to(device=device, non_blocking=True)
optimizer.zero_grad()
PYHbatch = model(Xbatch)
loss = lossFunc(PYHbatch,Ybatch)
loss.backward()
optimizer.step()
YHbatch = t.argmax(PYHbatch, dim=1)
train_acc += (YHbatch == Ybatch).sum().item()
train_loss += loss.item()
train_acc_seq.append(train_acc/(len(train_loader)*batch_size))
train_loss_seq.append(train_loss/(len(train_loader)*batch_size))
model.eval()
with t.no_grad():
test_acc = 0.0
test_loss = 0.0
for _Xbatch, _Ybatch in test_loader:
Xbatch = _Xbatch.to(device=device, non_blocking=True)
Ybatch = _Ybatch.to(device=device, non_blocking=True)
PYHbatch = model(Xbatch)
loss = lossFunc(PYHbatch,Ybatch)
YHbatch = t.argmax(PYHbatch, dim=1)
test_acc += (YHbatch == Ybatch).sum().item()
test_loss += loss.item()
test_acc_seq.append(test_acc/(len(test_loader)*batch_size))
test_loss_seq.append(test_loss/(len(test_loader)*batch_size))
logging.info(
f'epoch {epoch:^3d} end with\n'
f' test_acc {test_acc_seq[-1]:.5f}\n'
f' test_loss {test_loss_seq[-1]:.5f}\n'
f' train_acc {train_acc_seq[-1]:.5f}\n'
f'train_loss {train_loss_seq[-1]:.5f}\n'
)
return
def train_test_routine():
net = Net()
lossFunc = nn.CrossEntropyLoss(reduction='sum')
optimizer = optim.AdamW(net.parameters(), lr=1e-3, amsgrad=True)
_train_test_routine(net,lossFunc,optimizer)
return