题目描述:50k张图片作为训练集,提供10k张图片作为测试集,提供5.4k个约束条件(非冗余条件共4k个),以csv格式文件存储。该文件每行共3个数(记为i, j, l),用逗号分割,表示训练集id为i和id为j的图片的关系为l。l的取值为1表示同簇,l的取值为-1表示异簇。形如:1,2,1
2,4,-1
目前问题:效果低,Loss始终为负数。请有人提建议,帮修改。
#最终版
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
# 定义数据集类
class ImageDataset(Dataset):
def __init__(self, data_folder, transform=None):
self.data_folder = data_folder
self.image_paths = os.listdir(data_folder)
self.transform = transform
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img_name = os.path.join(self.data_folder, self.image_paths[idx])
img = Image.open(img_name).convert("RGB")
if self.transform:
img = self.transform(img)
return img
# 定义编码器
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.encoder = nn.Sequential(
nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
nn.ReLU(True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
nn.ReLU(True),
nn.MaxPool2d(kernel_size=2, stride=2)
)
def forward(self, x):
x = self.encoder(x)
return x
# 定义聚类层
class ClusteringLayer(nn.Module):
def __init__(self, input_dim, num_clusters):
super(ClusteringLayer, self).__init__()
self.num_clusters = num_clusters
self.weights = nn.Parameter(torch.Tensor(num_clusters, input_dim))
self.reset_parameters()
def reset_parameters(self):
nn.init.xavier_uniform_(self.weights)
def forward(self, x):
# Reshape x to (batch_size, input_dim)
x_reshaped = x.view(x.size(0), -1)
# Expand weights to match batch size
expanded_weights = self.weights.unsqueeze(0).expand(x.size(0), -1, -1)
# Compute the distance between x and weights
distances = torch.sum(torch.pow(x_reshaped.unsqueeze(1) - expanded_weights, 2), dim=2)
# Compute q
q = 1.0 / (1.0 + distances)
q = q.pow((1 + 1) / 2)
q = (q.t() / torch.sum(q, dim=1)).t()
return q
# 定义深度聚类网络
class DeepClusteringNet(nn.Module):
def __init__(self, num_clusters):
super(DeepClusteringNet, self).__init__()
self.encoder = Encoder()
self.clustering_layer = ClusteringLayer(32 * 8 * 8, num_clusters)
def forward(self, x):
x = self.encoder(x)
x = x.view(x.size(0), -1)
self.encoder_output_dim = x.size(1) # 计算编码器输出维度
q = self.clustering_layer(x)
return q
# 训练模型
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
model.train()
for epoch in range(num_epochs):
running_loss = 0.0
for inputs in train_loader:
inputs = inputs.to(device)
optimizer.zero_grad()
outputs = model(inputs)
target = torch.ones_like(outputs) / outputs.size(1) # 构建均匀分布作为目标
loss = criterion(outputs, target) # 使用KL散度作为损失函数
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
epoch_loss = running_loss / len(train_loader.dataset)
print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")
# 对测试集进行聚类
def cluster_test_set(model, test_loader):
model.eval()
predictions = []
with torch.no_grad():
for inputs in test_loader:
inputs = inputs.to(device)
outputs = model(inputs)
_, pred = torch.max(outputs, 1)
predictions.extend(pred.cpu().numpy())
return predictions
# 加载数据
train_data_folder = "C:\\Users\\marryhan\\Desktop\\image_train"
test_data_folder = "C:\\Users\\marryhan\\Desktop\\image_test"
query_file = "C:\\Users\\marryhan\\Desktop\\query.csv"
train_dataset = ImageDataset(train_data_folder, transform=transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
]))
test_dataset = ImageDataset(test_data_folder, transform=transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
]))
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 创建数据加载器
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 建立模型
num_clusters = 4
model = DeepClusteringNet(num_clusters).to(device)
# 定义损失函数和优化器
criterion = nn.KLDivLoss(reduction='batchmean') # 修改为'batchmean'
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
train_model(model, train_loader, criterion, optimizer, num_epochs=10)
# 对测试集进行聚类
test_predictions = cluster_test_set(model, test_loader)
# 保存结果到CSV文件
results = pd.DataFrame({'ID': range(len(test_predictions)), 'Class': test_predictions})
results.to_csv('test_results.csv', index=False)