This is a deep learning project. This dataset has 101,000 images for 101 classes. In this project, I tried ResNet50 and DenseNet, and the final model is DenseNet121. Transfer Learning is utilized, and pretrained Model is based on Image Net, which has a good feature extract ability.
Traing Environment: Google Cloud Platform with Nvidia Tesla P100
Accuracy: Top1 84%, Top5 93%
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import time
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
import torch
from torch import nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms, models
from torch.utils.tensorboard import SummaryWriter
from autoaugment import ImageNetPolicy
# Use GPU if it's available
if torch.cuda.is_available():
device = torch.device("cuda")
torch.set_default_tensor_type(torch.cuda.FloatTensor)
else:
device = torch.device("cpu")
torch.set_default_tensor_type(torch.FloatTensor)
# Hyper-parameters
num_epochs = 100
learning_rate = 0.01
batch_size = 128
data_path = r"/home/joezhou1994001_gmail_com/dataset/food-101"
data_path = r"/home/ec2-user/dataset/food-101"
data_path = r"/Volumes/SD/dataset/food-101"
# TODO: Define transforms for the training data and testing data
train_transforms = transforms.Compose([transforms.RandomRotation(30),
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(), ImageNetPolicy(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])])
test_transforms = transforms.Compose([transforms.Resize(255),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])])
# Pass transforms in here, then run the next cell to see how the transforms look
train_data = datasets.ImageFolder(data_path + r'/train', transform=train_transforms)
val_data = datasets.ImageFolder(data_path + r'/val', transform=test_transforms)
test_data=datasets.ImageFolder(data_path + r'/test', transform=test_transforms)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=128, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_data, batch_size=128, shuffle=True)
test_loader=torch.utils.data.DataLoader(test_data, batch_size=64, shuffle=True)
# DenseNet121
model =models.densenet121(pretrained=True)
# ResNet
# model =models.resnet(50)(pretrained=True)
# model
######################################################
# Run this block first time you run,
# for validation purpose, make sure the model works.
######################################################
# Freeze parameters so we don't backprop through them
# 固定住已经训练好的参数
for param in model.parameters():
param.requires_grad = False
# Edit the last full connection layer 1024 -> 500 -> 101
from collections import OrderedDict
classifier = nn.Sequential(OrderedDict([
('fc1', nn.Linear(1024, 500)),
('relu', nn.ReLU()),
('fc2', nn.Linear(500, 101)),
('output', nn.LogSoftmax(dim=1))
]))
model.classifier = classifier
model.to(device)
criterion = nn.NLLLoss()
optimizer = torch.optim.Adam(model.classifier.parameters(), lr=0.001)
for ii, (inputs, labels) in enumerate(train_loader):
# Move input and label tensors to the GPU
inputs, labels = inputs.to(device), labels.to(device)
start = time.time()
outputs = model.forward(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if ii==3:
break
print(f"Device = {device}; Time per batch: {(time.time() - start)/3:.3f} seconds")
Last layer of retrained model is [1024, 1000]. Here we have 101 classes, so change it.
#model = models.densenet201(pretrained=True)
model.to(device)
# Freeze parameters so we don't backprop through them
for param in model.parameters():
param.requires_grad = False
model.classifier = nn.Sequential(
nn.Linear(1024,512),
nn.LeakyReLU(),
nn.Linear(512,256),
nn.LeakyReLU(),
nn.Linear(256,101)
)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.classifier.parameters(), lr=0.001, betas=[0.9, 0.999])
from torch.utils.tensorboard import SummaryWriter
def train(num_epochs,train_loader,test_loader, resnet, optimizer, criterion):
"""
Returns trained model
"""
# Record Loss and Accuracy to tensorboard. Default logdir is runs
writer = SummaryWriter()
start = time.time()
# initialize tracker for minimum validation loss.
test_loss_min = np.Inf
step = 0
total_step = len(train_loader)
for epoch in range(num_epochs):
################################################
#
# Training Mode
#
###############################################
model.train()
train_loss=0
for i, data in enumerate(train_loader):
inputs, labels = data[0].to(device), data[1].to(device)
# Forward
outputs = resnet(inputs)
loss = criterion(outputs, labels)
# Backward
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Loss/Accuracy
train_loss += loss.item()
_, argmax = torch.max(outputs, 1)
accuracy = (labels == argmax.squeeze()).float().mean()
# Print Train Accuracy/Loss every 100 steps.
if (i+1) %100 == 0:
time_cost = time.time()-start
print ('Train: Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuray: {}, Time:{}'.format(epoch+1, num_epochs, i+1, total_step, loss.item(), accuracy.item(), time_cost))
writer.add_scalar('Loss/train', loss.item(), step)
writer.add_scalar('Accuracy/train', accuracy, step)
step += 100
# Save the newest model. According to testing accuracy record. we can find the best model in history.
torch.save(model.state_dict(), 'models/temp/food101_{}.pth'.format(time.time()))
################################################
#
# Testing Mode
#
###############################################
model.eval()
test_loss = 0
test_accuracy = 0
test_step = 0
with torch.no_grad():
for i, data in enumerate(test_loader):
# For testing purpost, only calculate first 30 batches.
# if i == 30: break
images, labels = data[0].to(device), data[1].to(device)
# Forward
outputs = model(images)
batch_loss = criterion(outputs, labels)
test_loss += batch_loss.item()
# Get topk predition. 预测中topk
top_p, top_class = outputs.topk(5, dim=1)
labels = labels.view(-1,1) # [n] 转为 [n, 1] , 以便与 [n, 5] 做比较
# Is right answer in topk? 判断topk中是否判断正确
results = top_class == labels # tensor([[False, False, False, True, False], [False, True, False, False, False], ....)
results = results.sum(1) # tensor([1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1]), 每一行只要有一个为True就返回1, dim=1表示对维度1求和
# Calculate average accuracy. 每一个batch有一个Accuracy, 最后除以batch数, 求平均
test_accuracy += torch.mean(results.float()).item()
# Avg Accuracy/Loss for each epoch
avg_test_accuracy = test_accuracy/(i+1)
avg_test_loss = test_loss/(i+1)
time_cost = time.time()-start
print ('Test: Epoch [{}/{}], Loss: {:.4f}, Accuray Top5: {:.3%}, Time:{}'.format(epoch+1, num_epochs, avg_test_loss, avg_test_accuracy, time_cost))
# Record testing result in tensorboard.
writer.add_scalar('Loss/test', avg_test_loss, epoch)
writer.add_scalar('Accuracy/test', avg_test_accuracy, epoch)
# Save the model with lower loss.
if test_loss <= test_loss_min:
print("Saved Model with Lower Loss")
torch.save(model.state_dict(),'models/food101_less_loss.pth')
test_loss_min=avg_test_loss
# End Testing Mode
model.train()
train(100, train_loader, test_loader, model, optimizer, criterion)
Here we always save the most recent model.
torch.save(model.state_dict(), 'models/temp/food101_{}.pth'.format(time.time()))
# Most recent model: food101_model01/.pth 是最新模型
# model.load_state_dict(torch.load('models/food101_model01.pth', map_location=device)
# Model with lowest loss: food101_less_loss.pth 是当前拥有最低loss的模型
model.load_state_dict(torch.load('models/food101_less_loss.pth', map_location=device))
model.eval()
start = time.time()
test_loss_min = np.Inf
test_loss=0
test_accuracy=0
with torch.no_grad():
for i, data in enumerate(test_loader):
# if i == 30: break
images, labels = data[0].to(device), data[1].to(device)
# Forward
outputs = model(images)
batch_loss = criterion(outputs, labels)
test_loss += batch_loss.item()
# Topk
top_p, top_class = outputs.topk(5, dim=1)
labels = labels.view(-1,1) # [n] 转为 [n, 1] , 以便与 [n, 5] 做比较
# Predict correctly?
results = top_class == labels # tensor([[False, False, False, True, False], [False, True, False, False, False], ....)
results = results.sum(1) # tensor([1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1]), 每一行只要有一个为True就返回1, dim=1表示对维度1求和
# Avg accuracy / Loss
test_accuracy += torch.mean(results.float()).item()
avg_test_accuracy = test_accuracy/(i+1)
avg_test_loss = test_loss/(i+1)
time_cost = time.time()-start
print ('Test: Step [{}/{}], Loss: {:.4f}, Accuray Top5: {:.3%}, Time:{}'.format(i+1, len(test_loader), avg_test_loss, avg_test_accuracy, time_cost))
# End testing
model.train()
print('Finish Testing')
from PIL import Image
# list of class names by index, i.e. a name can be accessed like class_names[0]
#class_names = [item[4:] for item in train_data.classes]
def predict(img_path, topk=5):
"""
给定一张图的路径, 对其类别进行预测
given a img, predict its class.
"""
model.eval()
# preprocess img. 按指定格式对图片进行压缩处理
img = Image.open(img_path)
img = test_transforms(img)
# img: [3, 224, 224]
# 因为model接受batch的维度是[n, 3, 224, 224], 所以要在最前面增加一维度
img = torch.unsqueeze(img, 0)
# Predict top5
outputs = torch.exp(model(img.to(device)))
top_p, top_class = outputs.topk(topk, dim=1)
pred = top_class.data.cpu().numpy()[0]
results = [train_data.classes[i] for i in pred]
return results
diy_path = os.path.join(data_path, 'test/apple_pie')
for img_name in os.listdir(diy_path):
if '.DS_Store' in img_name: continue
img_path = os.path.join(diy_path, img_name)
img = cv2.imread(img_path)
plt.imshow(img[...,::-1]) # bgr 转为 rgb
plt.xticks([])
plt.yticks([])
plt.show()
results = predict(img_path)
print('Predict:', results)