import numpy as np
import time
from matplotlib import pyplot as plt
import json
import copy
import os
import torch
from torch import nn
from torch import optim
from torchvision import transforms, models, datasets
# 展示图片数据
def im_convert(tensor):
""" 展示数据"""
image = tensor.to("cpu").clone().detach()
image = image.numpy().squeeze()
image = image.transpose(1, 2, 0) # 将H W C还原回去
image = image * np.array((0.229, 0.224, 0.225)) + np.array((0.485, 0.456, 0.406)) # 还原标准化,先乘再加
image = image.clip(0, 1)
return image
# 设置需要训练的网络
def set_parameter_requires_grad(model, feature_extracting):
if feature_extracting:
for param in model.parameters():
param.requires_grad = False
# 初始化resnet模型 冻结卷积层 构建全连接层
def initialize_resnet_model(feature_extract, use_pretrained=True):
model = models.resnet152(pretrained=use_pretrained) # 是否要下载预训练模型
set_parameter_requires_grad(model, feature_extract) # 冻结层不进行梯度更新
num_ftrs = model.fc.in_features # 模型全连接层的输入特征个数
model.fc = nn.Sequential(nn.Linear(num_ftrs, 5), nn.LogSoftmax(dim=1)) # 构建模型全连接层
return model
# 训练模块
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs, filename):
since = time.time() # 开始计时
best_acc = 0 # 保存最好精度
val_acc_history = [] # 保存验证集精度
train_acc_history = [] # 保存训练集精度
train_losses = [] # 保存训练集损失
valid_losses = [] # 保存验证集损失
best_model_wts = copy.deepcopy(model.state_dict()) # 加载预训练模型参数
LRs = [optimizer.param_groups[0]['lr']] # 获取学习率
# 使用GPU训练
if not torch.cuda.is_available():
print('CUDA is not available. Training on CPU ...')
else:
print('CUDA is available. Training on GPU ...')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device) # 模型传入GPU
# 开始训练
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch + 1, num_epochs))
print('-' * 10)
# 训练和验证
for phase in ['train', 'valid']:
if phase == 'train':
model.train() # 训练
else:
model.eval() # 验证
running_loss = 0.0
running_corrects = 0
# 把数据都取个遍
for inputs, labels in dataloaders[phase]:
# 数据传入GPU
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad() # 梯度清零
with torch.set_grad_enabled(phase == 'train'): # 只有训练的时候计算和更新梯度
# 预测三大件 计算结果 计算损失 梯度下降更新权重
outputs = model(inputs) # 计算预测结果
loss = criterion(outputs, labels) # 计算损失
if phase == 'train': # 训练阶段更新权重
loss.backward()
optimizer.step()
_, preds = torch.max(outputs, 1) # 返回模型计算概率最大的值及其索引 计算精度使用
# 计算损失
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
epoch_loss = running_loss / len(dataloaders[phase].dataset)
epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
time_elapsed = time.time() - since
print('Time elapsed {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
# 得到最好那次的模型 针对验证集计算
if phase == 'valid' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict()) # 拷贝模型的参数
state = {
'state_dict': model.state_dict(),
'best_acc': best_acc,
'optimizer': optimizer.state_dict(),
}
torch.save(state, filename) # 保存模型
if phase == 'valid':
val_acc_history.append(epoch_acc)
valid_losses.append(epoch_loss)
# scheduler.step(epoch_loss)
if phase == 'train':
train_acc_history.append(epoch_acc)
train_losses.append(epoch_loss)
print('Optimizer learning rate : {:.7f}'.format(optimizer.param_groups[0]['lr']))
LRs.append(optimizer.param_groups[0]['lr'])
print()
# 训练的总时间
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# 训练完后用最好的一次当做模型最终的结果
model.load_state_dict(best_model_wts)
return model, val_acc_history, train_acc_history, valid_losses, train_losses, LRs
# 数据读取与预处理
# 数据集存放路径
data_dir = '/media/public/'
train_dir = data_dir + '/train' # 训练集
valid_dir = data_dir + '/valid' # 验证集
# 制作数据源 因为数据集里的图片大小都不一 一般网络都需要224*244的数据输入
data_transforms = {
'train': transforms.Compose([ # Compose()函数将所有步骤放在一起打包进行
transforms.Resize(256), # 首先要缩小成256*256的大小再进行裁剪
transforms.CenterCrop(224), # 从中心开始裁剪成224*224的图片
# 数据增强 验证集不需要此步骤
transforms.RandomRotation(45), # 随机旋转,-45到45度之间随机选
transforms.RandomHorizontalFlip(p=0.5), # 随机水平翻转 选择一个概率概率
transforms.RandomVerticalFlip(p=0.5), # 随机垂直翻转
transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1), # 参数1为亮度, 参数2为对比度,参数3为饱和度,参数4为色相
transforms.RandomGrayscale(p=0.025), # 概率转换成灰度率, 3通道就是R=G=B
transforms.ToTensor(), # 图片数据转换为tensor格式
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # 标准差标准化 平衡各特征的贡献
]),
'valid': transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
batch_size = 32
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'valid']} # 制作数据集
data_loaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True) for x in ['train', 'valid']} # 制作batch数据
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'valid']} # 获取数据大小 训练集6552个 验证集818个
class_names = image_datasets['train'].classes # 获取类别名(数字标签)
# 获取标签对应的实际名字
with open('class_indices.json', 'w+') as f:
#cat_to_name = json.load(f)
cat_to_name = f.read()
# 训练三大件 初始化模型 初始化损失函数 初始化优化器
model_resnet = initialize_resnet_model(feature_extract=True, use_pretrained=True) # 加载resnet模型 冻结卷积层
# 优化器设置
optimizer_ft = optim.Adam(model_resnet.parameters(), lr=1e-2)
scheduler_ft = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) # 学习率每7个epoch衰减成原来的1/10
# 初始化损失函数
criterion = nn.NLLLoss() # nn.CrossEntropyLoss()相当于logSoftmax()和nn.NLLLoss() 最后一层构建全连接层时已经用了LogSoftmax()
# 预训练
filename = 'checkpoint.pth' # 模型保存路径
epoch_ft = 1
model_ft, val_acc_history_ft, train_acc_history_ft, valid_losses_ft, train_losses_ft, LRs_ft = train_model(
model_resnet, data_loaders, criterion, optimizer_ft, scheduler_ft, epoch_ft, filename)
# 再次训练 训练三大件 初始化模型 初始化损失函数 初始化优化器 此处损失函数不需要再初始化了
set_parameter_requires_grad(model_ft, False) # 不冻结卷积层
optimizer = optim.Adam(model_ft.parameters(), lr=1e-4) # 再继续训练所有的参数 学习率调小一点
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
# 二次训练
epoch = 10
model, val_acc_history, train_acc_history, valid_losses, train_losses, LRs = train_model(
model_ft, data_loaders, criterion, optimizer, scheduler, epoch, filename)
# 测试训练好的模型
model_ft.eval()
# 加载模型
checkpoint = torch.load('checkpoint.pth')
best_acc = checkpoint['best_acc']
model_ft.load_state_dict(checkpoint['state_dict'])
# 加载数据 用batch传入数据进行测试
dataiter = iter(data_loaders['valid'])
images, labels = dataiter.next()
# 开始测试
if torch.cuda.is_available():
output = model_ft(images.cuda())
else:
output = model_ft(images)
_, preds_tensor = torch.max(output, 1) # 得出概率最大的值以及下标
preds = np.squeeze(preds_tensor.numpy()) if not torch.cuda.is_available() else np.squeeze(preds_tensor.cpu().numpy())
# 展示结果
fig = plt.figure(figsize=(20, 20))
columns = 4
rows = 2
for idx in range(columns*rows):
ax = fig.add_subplot(rows, columns, idx+1, xticks=[], yticks=[])
plt.imshow(im_convert(images[idx]))
ax.set_title("{} ({})".format(cat_to_name[int(preds[idx])], cat_to_name[int(labels[idx].item())]),
color=("green" if cat_to_name[int(preds[idx])] == cat_to_name[int(labels[idx].item())] else "red"))
plt.show()
报错