对比学习正确率只有50%,和有监督学习差不多
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Dataset
from torchvision import models
from tqdm import tqdm
from torchvision.transforms import autoaugment
import numpy as np
# ----------------------
# 1. 数据准备
# ----------------------
# 假设数据集结构:
# food11/
# ├── train/ (无监督预训练数据)
# ├── labeled/ (有监督微调数据,部分标注)
# └── test/
# 定义对比学习的数据增强
class ContrastiveTransformations:
def __init__(self):
self.transform = transforms.Compose([
transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(0.4, 0.4, 0.4, 0.1),
transforms.GaussianBlur(kernel_size=23),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
def __call__(self, x):
return [self.transform(x), self.transform(x)] # 生成两个增强视图
# 无监督预训练数据集
class UnlabeledDataset(Dataset):
def __init__(self, root_dir):
self.dataset = ImageFolder(root=root_dir, transform=ContrastiveTransformations())
def __getitem__(self, index):
images = self.dataset[index][0] # 返回两个增强视图
return images[0], images[1] # (view1, view2)
def __len__(self):
return len(self.dataset)
HW = 224
# 有监督微调数据集
class LabeledDataset(Dataset):
def __init__(self, root_dir):
self.dataset = ImageFolder(
root=root_dir,
transform=transforms.Compose([
transforms.RandomResizedCrop(HW),
transforms.RandomHorizontalFlip(),
#autoaugment.AutoAugment(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
)
def __getitem__(self, index):
return self.dataset[index] # (image, label)
def __len__(self):
return len(self.dataset)
# 创建DataLoader
train_unlabeled = UnlabeledDataset(root_dir=r'D:\BaiduNetdiskDownload\第四五节_分类代码 (2)\food_classification\food-11\training\unlabeled')
train_labeled = LabeledDataset(root_dir=r'D:\BaiduNetdiskDownload\第四五节_分类代码 (2)\food_classification\food-11\training\labeled')
test_dataset = LabeledDataset(root_dir=r'D:\BaiduNetdiskDownload\第四五节_分类代码 (2)\food_classification\food-11\validation')
batch_size = 32
#num_workers = 4
train_unlabeled_loader = DataLoader(
train_unlabeled, batch_size=batch_size, shuffle=True
)
train_labeled_loader = DataLoader(
train_labeled, batch_size=batch_size, shuffle=True
)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
# ----------------------
# 2. 模型定义
# ----------------------
# SimCLR模型(无监督预训练)
class Encoder(nn.Module):
def __init__(self, base_encoder=models.resnet50):
#super().__init__()
super(Encoder, self).__init__()
self.layer0 = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1, bias=True),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(2)
) # 112*112
self.layer1 = nn.Sequential(
nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1, bias=True),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(2)
) # 56*56
self.layer2 = nn.Sequential(
nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1, bias=True),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.MaxPool2d(2)
) # 28*28
self.layer3 = nn.Sequential(
nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1, bias=True),
nn.BatchNorm2d(512),
nn.ReLU(inplace=True),
nn.MaxPool2d(2)
)
self.pool1 = nn.MaxPool2d(2)
self.flatten = nn.Flatten()
def forward(self, x):
x = self.layer0(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.pool1(x)
x = self.flatten(x)
return x
#自定义投影头
class Projection(nn.Module):
def __init__(self, input_dim=25088, hidden_dim=2048, output_dim=128):
super().__init__()
self.mlp = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim), # 添加BatchNorm
nn.ReLU(inplace=True),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return nn.functional.normalize(self.mlp(x), dim=1)
#完整对比模型,投影头+编码器
class CustomSimCLR(nn.Module):
def __init__(self, encoder, projection_head):
super().__init__()
self.encoder = encoder # 自定义编码器(如ResNet)
self.projection_head = projection_head # 自定义投影头
def forward(self, x):
features = self.encoder(x) # 编码器提取特征
projections = self.projection_head(features) # 投影到对比空间
return projections
# 使用
encoder = Encoder()
projection_head = Projection(input_dim=25088, hidden_dim=2048, output_dim=128)
# 分类模型(有监督微调)
class FineTuneModel(nn.Module):
def __init__(self, pretrained_encoder, num_classes=11):
super().__init__()
self.encoder = pretrained_encoder.encoder
#self.fc = nn.Linear(25088, num_classes) # 替换为分类头
self.fc = nn.Linear(25088, 512)
# self.drop = nn.Dropout(0.5)
self.relu1 = nn.ReLU(inplace=True)
self.fc2 = nn.Linear(512, num_classes)
def forward(self, x):
features = self.encoder(x)
x = self.fc(features)
x = self.relu1(x)
x = self.fc2(x)
return x
def nt_xent_loss(z1, z2, temperature=0.1):
z = torch.cat([z1, z2], dim=0)
sim_matrix = torch.mm(z, z.t()) / temperature
mask = (torch.eye(z.shape[0]) == 0).float().to(device)
sim_matrix = sim_matrix * mask # 排除自身对比
targets = torch.cat([torch.arange(z1.size(0)) + z1.size(0), torch.arange(z1.size(0))], dim=0).to(device)
loss = torch.nn.functional.cross_entropy(sim_matrix, targets)
return loss
# ----------------------
# 3. 无监督预训练
# ----------------------
# ...(保持前面的代码不变)
if __name__ == '__main__':
# ----------------------
# 3. 无监督预训练
# ----------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#model = CustomSimCLR(encoder, projection_head).to(device)
#optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4,weight_decay=1e-4)
'''
# 预训练循环
num_epochs = 50
model.train()
for epoch in range(num_epochs):
total_loss = 0.0
for view1, view2 in tqdm(train_unlabeled_loader, desc=f'Epoch {epoch + 1}'):
view1, view2 = view1.to(device), view2.to(device)
optimizer.zero_grad()
z1 = model(view1)
z2 = model(view2)
loss = nt_xent_loss(z1, z2)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_unlabeled_loader)
print(f'Pretrain Epoch {epoch + 1}, Loss: {avg_loss:.4f}')
# 保存预训练权重
torch.save(model.state_dict(), 'simclr_pretrained.pth')
'''
# ----------------------
# 4. 有监督微调
# ----------------------
# 加载预训练模型
pretrained_model = CustomSimCLR(encoder, projection_head)
pretrained_model.load_state_dict(torch.load('simclr_pretrained.pth'))
finetune_model = FineTuneModel(pretrained_model).to(device)
# 冻结编码器(可选)
#for param in finetune_model.encoder.parameters():
#param.requires_grad = False
optimizer = torch.optim.AdamW(finetune_model.parameters(), lr=1e-4, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()
# 微调循环
num_finetune_epochs = 20
best_acc = 0.0
for epoch in range(num_finetune_epochs):
finetune_model.train()
total_loss, total_correct = 0.0, 0
for images, labels in tqdm(train_labeled_loader, desc=f'Finetune Epoch {epoch + 1}'):
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = finetune_model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
_, preds = torch.max(outputs, 1)
total_correct += torch.sum(preds == labels.data).item()
# 验证集评估
finetune_model.eval()
test_correct = 0
with torch.no_grad():
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = finetune_model(images)
_, preds = torch.max(outputs, 1)
test_correct += torch.sum(preds == labels.data).item()
train_acc = total_correct / len(train_labeled.dataset)
test_acc = test_correct / len(test_dataset)
print(f'Epoch {epoch + 1}, Train Loss: {total_loss / len(train_labeled_loader):.4f}, '
f'Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')
if test_acc > best_acc:
best_acc = test_acc
torch.save(finetune_model.state_dict(), 'best_finetune_model.pth')
print(f'Best Test Accuracy: {best_acc:.4f}')
有什么方法能提高正确率,对比学习正确率不是得到70%吗