0
点赞
收藏
分享

微信扫一扫

pytorch 入门示例代码 以 mnist 为例

言午栩 2022-04-16 阅读 70
pytorch

导入库

from typing import *

import os
import sys
import random
import pickle
import logging
from datetime import datetime

try:
    from tqdm.notebook import tqdm as _tqdm
except:
    _tqdm = lambda x : x

import numpy as np
from matplotlib import pyplot as plt

import torch as t
import torch.nn as nn
import torch.optim as optim

import torchvision as tv
from torch.utils.data import DataLoader

准备数据集

data_dir = '.'
batch_size = 256
train_dataset = tv.datasets.MNIST(root=data_dir, train=True, transform=tv.transforms.ToTensor(), download=True)
test_dataset = tv.datasets.MNIST(root=data_dir, train=False, transform=tv.transforms.ToTensor(), download=True)

logging 和 seed

logging.basicConfig(
    format='%(asctime)s %(levelname)s %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    level=logging.INFO,
    stream=sys.stdout,
    force=True,
)

_SEED = 0
random.seed(_SEED)
np.random.seed(_SEED)
t.manual_seed(_SEED)
t.cuda.manual_seed_all(_SEED)
# t.backends.cudnn.benchmark = False # burden for performance
# t.backends.cudnn.deterministic = True # burden for performance
# t.use_deterministic_algorithms(True) # deprecated in cuda 11+

assert random.random() == 0.662743203727557, 'different random generator'
assert np.random.rand() == 0.11717829958543136, 'different random generator'
assert t.rand((1,)).item() == 0.997646152973175, 'different random generator'

if t.cuda.is_available():
    for i in range(t.cuda.device_count()):
        logging.info(t.cuda.get_device_properties(i))
    logging.warning('using cuda:0')
    device = t.device('cuda:0')
else:
    logging.warning('using cpu')
    device = t.device('cpu')

def gpu_state(gpu_id, get_return=False):
    qargs = ['index', 'gpu_name', 'memory.used', 'memory.total']
    cmd = 'nvidia-smi --query-gpu={} --format=csv,noheader'.format(','.join(qargs))
    results = os.popen(cmd).readlines()
    gpu_id_list = gpu_id.split(",")
    gpu_space_available = {}
    for cur_state in results:
        cur_state = cur_state.strip().split(", ")
        for i in gpu_id_list:
            if i == cur_state[0]:
                if not get_return:
                    logging.info(f'GPU {i} {cur_state[1]}: Memory-Usage {cur_state[2]} / {cur_state[3]}.')
                else:
                    gpu_space_available[i] = int("".join(list(filter(str.isdigit, cur_state[3])))) - int("".join(list(filter(str.isdigit, cur_state[2]))))
    if get_return:
        return gpu_space_available

神经网络

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels=1,  out_channels=16, kernel_size=3,
            padding=1, padding_mode='zeros', stride=1, dilation=1,
            groups=1, bias=True,
        )
        self.conv2 = nn.Conv2d(
            in_channels=16, out_channels=32, kernel_size=3,
            padding=1, padding_mode='circular', stride=1, dilation=1,
            groups=4, bias=True,
        )
        self.pool = nn.MaxPool2d(
            kernel_size=2, padding=0, stride=2, dilation=1,
            return_indices=False, ceil_mode=False,
        )
        self.fc1 = nn.Linear(32*7*7, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)
        self.preluConv1 = nn.PReLU(num_parameters=1, init=0.5)
        self.preluConv2 = nn.PReLU(num_parameters=1, init=0.5)
        self.preluFC1 = nn.PReLU(num_parameters=1, init=0.5)
        self.preluFC2 = nn.PReLU(num_parameters=1, init=0.5)
        self.preluFC3 = nn.PReLU(num_parameters=1, init=0.5)
        
    def forward(self, x):
        x = x.view(x.size()[0], 1, 28, 28)
        x = self.conv1(x)
        x = self.preluConv1(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.preluConv2(x)
        x = self.pool(x)
        x = x.view(x.size()[0], 32*7*7)
        x = self.fc1(x)
        x = self.preluFC1(x)
        x = self.fc2(x)
        x = self.preluFC2(x)
        x = self.fc3(x)
        x = self.preluFC3(x)

        return x 

训练

epochs = 4
batch_size = 100

def _train_test_routine(model:nn.Module, lossFunc:nn.Module, optimizer:optim.Optimizer):

    print(model)
    print(lossFunc)
    print(optimizer)

    model.to(device=device, non_blocking=False)

    train_loss_seq = []
    train_acc_seq = []
    test_loss_seq = []
    test_acc_seq = []

    test_loader = DataLoader(test_dataset, batch_size=batch_size)
    train_loader = DataLoader(train_dataset, batch_size=batch_size)

    for epoch in range(epochs):

        logging.info(f'epoch {epoch:^3d} start')

        model.train()

        train_acc = 0.0
        train_loss = 0.0

        for _Xbatch, _Ybatch in train_loader:
            Xbatch = _Xbatch.to(device=device, non_blocking=True)
            Ybatch = _Ybatch.to(device=device, non_blocking=True)
            optimizer.zero_grad()
            PYHbatch = model(Xbatch)
            loss = lossFunc(PYHbatch,Ybatch)
            loss.backward()
            optimizer.step()
            YHbatch = t.argmax(PYHbatch, dim=1)
            train_acc += (YHbatch == Ybatch).sum().item()
            train_loss += loss.item()

        train_acc_seq.append(train_acc/(len(train_loader)*batch_size))
        train_loss_seq.append(train_loss/(len(train_loader)*batch_size))

        model.eval()

        with t.no_grad():

            test_acc = 0.0
            test_loss = 0.0

            for _Xbatch, _Ybatch in test_loader:
                Xbatch = _Xbatch.to(device=device, non_blocking=True)
                Ybatch = _Ybatch.to(device=device, non_blocking=True)
                PYHbatch = model(Xbatch)
                loss = lossFunc(PYHbatch,Ybatch)
                YHbatch = t.argmax(PYHbatch, dim=1)
                test_acc += (YHbatch == Ybatch).sum().item()
                test_loss += loss.item()

            test_acc_seq.append(test_acc/(len(test_loader)*batch_size))
            test_loss_seq.append(test_loss/(len(test_loader)*batch_size))

            logging.info(
                f'epoch {epoch:^3d} end with\n'
                f'  test_acc {test_acc_seq[-1]:.5f}\n'
                f' test_loss {test_loss_seq[-1]:.5f}\n'
                f' train_acc {train_acc_seq[-1]:.5f}\n'
                f'train_loss {train_loss_seq[-1]:.5f}\n'
            )

    return

def train_test_routine():
    net = Net()
    lossFunc = nn.CrossEntropyLoss(reduction='sum')
    optimizer = optim.AdamW(net.parameters(), lr=1e-3, amsgrad=True)
    _train_test_routine(net,lossFunc,optimizer)
    return
举报

相关推荐

0 条评论