MNIST手写数字识别

这是kaggle的一个入门赛,链接:Digit Recognizer

1. 导入所需模块

import os
import math
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
import torch.nn as nn
from torch.autograd import Variable
from torch.nn import functional as F
from torchvision import transforms

2. 读取CSV文件

dataframe_train_valid = pd.read_csv(os.path.join('input', 'train.csv'), dtype=np.float32)
dataframe_test = pd.read_csv(os.path.join('input', 'test.csv'), dtype=np.float32)

3. 准备数据

class mnist_data(Dataset):
    def __init__(self, type, dataframe, transform):
        if type == 'train' or type == 'valid':
            labels = dataframe.label.values 
            features = (dataframe.loc[:, dataframe.columns != "label"].values) 
            # 划分训练集与验证集
            features_train, features_valid, labels_train, labels_valid = 
            train_test_split(features, labels, test_size=0.2, random_state=0)
            if type == 'train':
                self.X = features_train.reshape((-1,28,28))
                self.y = labels_train
            elif type == 'valid':
                self.X = features_valid.reshape((-1,28,28))
                self.y = labels_valid
        if type == 'test':
            self.X = dataframe.values.reshape((-1,28,28))
            self.y = None
        self.transform = transform
    
    def __getitem__(self, index):
        if self.y is not None:
            return self.y[index], self.transform(self.X[index])
        else:
            return self.transform(self.X[index])
    
    def __len__(self):
        return self.X.shape[0]

batch_size = 256
train_dataset = mnist_data('train', dataframe_train_valid, 
                           transform=transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize(mean=(0.1307,), std=(0.3081,))
                           ]))
valid_dataset = mnist_data('valid', dataframe_train_valid, 
                           transform=transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize(mean=(0.1307,), std=(0.3081,))
                           ]))
test_dataset = mnist_data('test', dataframe_test, 
                           transform=transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize(mean=(0.1307,), std=(0.3081,))
                           ]))
train_dataloader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=False)
valid_dataloader = DataLoader(dataset=valid_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

4. 定义模型

# 定义基本模块
class BasicBlock(nn.Module):  
    def __init__(self, in_channel, out_channel, kernel_size, stride, padding): 
        super(BasicBlock, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channel, out_channel, kernel_size, stride, padding),
            nn.BatchNorm2d(out_channel),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channel, out_channel, kernel_size, stride, padding),
            nn.BatchNorm2d(out_channel),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
    
    def forward(self, x):
        out = self.model(x)
        return out
# 定义模型
class Net(nn.Module):
    def __init__(self, num_classes=10):
        super(Net, self).__init__()
        self.model = nn.Sequential(
            BasicBlock(1, 64, 3, 1, 1),
            BasicBlock(64, 64, 3, 1, 1),
            BasicBlock(64, 256, 3, 1, 1)
        )
        self.maxpool = nn.MaxPool2d(3)
        self.fc = nn.Sequential(
            nn.Linear(256, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        )
    def forward(self, x):
        out = self.model(x)
        out = self.maxpool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out
    
# 获取模型
def my_net(pretrained=False, **kwargs):
    model = Net(**kwargs)
    if pretrained: 
    # 加载预训练模型
        model.load_state_dict(torch.load('mymodel.pt'))
    return model

5. 训练

model = my_net(pretrained=False)
# 选择优化器
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
# 选择loss function
criterion = nn.CrossEntropyLoss()
# 若训练时测量值(如loss)停滞,则调整学习率
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 
                                                       patience=1, 
                                                       verbose=1,
                                                       factor=0.5, 
                                                       min_lr=1e-5)
# 使用gpu进行训练
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
criterion.to(device)

count = 0
loss_list = []
iteration_list = []
accuracy_list = []
best_accuracy = 0
for epoch in range(80):
    for i, (labels, images) in enumerate(train_dataloader):
        train = Variable(images.type(torch.FloatTensor)).to(device)
        labels = Variable(labels.type(torch.LongTensor)).to(device)
        
        optimizer.zero_grad()
        outputs = model(train)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        count = count + 1
        if count % 50 == 0:
        # 检查loss与该模型在验证集下的识别准确率
            correct = 0
            total = 0
            for i, (labels, images) in enumerate(valid_dataloader):
                valid = Variable(images.type(torch.FloatTensor)).to(device)
                labels = Variable(labels.type(torch.LongTensor)).to(device)
                outputs = model(valid)
                predicted = torch.max(outputs.data, 1)[1]
                total += len(labels)
                correct += (predicted == labels).sum()
            accuracy = 100 * correct / float(total)
            loss_list.append(loss.data)
            iteration_list.append(count)
            accuracy_list.append(accuracy)
            print('Iteration: {}  Loss: {}  Accuracy: {} %'.format(count, 
                                                                   loss.data, 
                                                                   accuracy))
            if accuracy > best_accuracy:
                torch.save(model.state_dict(),'mymodel.pt')
    scheduler.step(loss)

6. 训练过程可视化

# 可视化loss 
plt.plot(iteration_list,loss_list)
plt.xlabel("Number of iteration")
plt.ylabel("Loss")
plt.title("Loss vs Number of iteration")
plt.savefig('loss.png')
plt.show()

# 可视化accuracy 
plt.plot(iteration_list,accuracy_list,color = "red")
plt.xlabel("Number of iteration")
plt.ylabel("Accuracy")
plt.title("Accuracy vs Number of iteration")
plt.savefig('accuracy.png')
plt.show()

7. 测试

model = my_net(pretrained=True)
model.eval()
model.to(device)
prediction = []
with torch.no_grad():
    for i, images in enumerate(test_dataloader):
        test = Variable(images.type(torch.FloatTensor)).to(device)
        outputs = model(test)
        predicted = torch.max(outputs.data, 1)[1]
        prediction.append(predicted.cpu())

8. 生成提交文件

p = [x.numpy() for x in prediction]
p = np.array(p)
p = np.hstack(p)
print(p.shape)
submission =  pd.DataFrame({
        "ImageId": np.arange(len(p))+1,
        "Label": p.tolist()
})
submission.to_csv('my_submission_resnet.csv', index=False)
print(submission)

9. 结果

原文地址:https://www.cnblogs.com/wang-haoran/p/13424734.html