模型训练
import pandas as pd
import numpy as np
from gensim.models import KeyedVectors
import jieba
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.models as models # 调用预训练模型
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics.pairwise import cosine_similarity
data1 = pd.read_csv(r"E:\BaiduNetdiskDownload\B题-全部数据\B题-数据\附件1\ImageWordData.csv")
# 1、提取文本数据---Series转化为列表
texts1_data = data1['caption'].tolist()
# 2、文本预处理
def pre_processsing(texts):
preprocesssing_texts = []
for sentence in texts:
token = list(jieba.cut(sentence))
preprocesssing_texts.append(token)
return preprocesssing_texts
# 调用函数预处理文本
texts1_data = pre_processsing(texts1_data)
# 3、本地加载保存好的预训练模型---Tencent AI Lab 200维 中文词向量模型
file = 'Tencent_AILab_ChineseEmbedding-200.bin'
# print(os.path.exists(file)) 查看本地是否存在该模型路径
word_model = KeyedVectors.load(file)
# 转化词-向量的字典形式存储
word_embeddings = {}
for idx, word in enumerate(word_model.index_to_key):
word_embeddings[word] = np.array(word_model.vectors[idx])
# 2、提取图像数据
image1_data = data1['image_id'].tolist()
# 划分训练集和验证集 8:2
image_train, image_test, text_train, text_test = train_test_split(image1_data, texts1_data, test_size=0.2,random_state=42)
# 调用torchvision转化图像数据
# 图片预处理
preprocesssing_images=torchvision.transforms.Compose([
torchvision.transforms.Resize(size=[224,224]),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
)
])
# 卷积神经网络对图片特征提取
class ImageEncoder(nn.Module):
def __init__(self,output_dim=128):
super(ImageEncoder, self).__init__()
# 调用预训练的Resnet18
self.cnn = models.resnet18(pretrained=True)
# 输出层
self.fc = nn.Linear(in_features=512,out_features=output_dim)
# 构建前向传播函数
def forward(self,x):
with torch.no_grad():
# 第一层卷积层操作
x = self.cnn.conv1(x)
# 特征值标准化
x = self.cnn.bn1(x)
# 调用ReLu激活函数
x = self.cnn.relu(x)
# 最大池化操作
x = self.cnn.maxpool(x)
# 4个Resnet残差块逐步提取特征
x = self.cnn.layer1(x)
x = self.cnn.layer2(x)
x = self.cnn.layer3(x)
x = self.cnn.layer4(x)
# 特征展平
x = F.adaptive_avg_pool2d(x, (1, 1))
x = x.view(x.size(0), -1)
# 进入输出层
x=self.fc(x)
return x
# 循环网络对文本向量的获取
class TextEncoder(nn.Module):
def __init__(self, out_dim = 128):
super(TextEncoder, self).__init__()
self.rnn = nn.LSTM(200, 128, batch_first=True)
self.fc = nn.Linear(in_features = 128, out_features = out_dim)
def forward(self, x):
_, (x, __) = self.rnn(x)
x = x.squeeze(1)
x = self.fc(x) # 直接将特征向量传递给全连接层
return x
# 图像文本特征融合模型
class Multimodel(nn.Module):
def __init__(self, out_dim=128):
super(Multimodel, self).__init__()
# 图像编辑器
self.image_encoder = ImageEncoder(out_dim)
# 文本编辑器
self.text_encoder = TextEncoder(out_dim)
# 全连接层维度相加
self.fc = nn.Linear(2*out_dim, 128) # [1,128]的维度
def forward(self, image, text):
# 获取图像特征
image_embedding = self.image_encoder(image) # [1,128]
# 获取文本特征
text_embedding = self.text_encoder(text) # [1,128]
# 连接图像特征和文本特征
combined = torch.cat((image_embedding, text_embedding), dim=1)
# 融合层
fusion = self.fc(combined)
return fusion
if __name__ == '__main' :
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 训练模型
num_epochs = 50 # 迭代次数
# image_encoder = ImageEncoder(output_dim=image_embedding_size).to(device)
# text_encoder = TextEncoder(embedding_dim=word_embedding_size).to(device)
model = Multimodel() # 定义多模态模型
batch_size = 100 # 每批次数量
criterion = nn.CosineEmbeddingLoss() # 余弦相似度作为损失度量
lr = 0.001 # 学习率
optimizer = optim.Adam(model.parameters(), lr=lr) # 优化器
train_loss = []
test_loss = []
for epoch in range(num_epochs):
print(f"第{epoch + 1}轮训练开始")
running_loss1 = 0.0 # 计算每次迭代的损失度
# 训练集训练
for image, text in zip(image_train, text_train):
image_path = r"E:\BaiduNetdiskDownload\B题-全部数据\B题-数据\附件1\ImageData" + "\\" + image
img = Image.open(image_path)
# 如果不是RGB格式转化为RGB格式
if img.mode != 'RGB':
img = img.convert('RGB')
# 图片预处理
img = preprocesssing_images(img).unsqueeze(0).to(device) # 添加批次量维度
# 句子向量转化
sentence = [torch.tensor(word_embeddings[word]) for word in text if word in word_embeddings.keys()]
if len(sentence) > 0:
sentence = torch.stack(sentence).unsqueeze(0).to(device) # 堆叠词向量和加入批次量维度 [1,1,xxx]
optimizer.zero_grad()
# 图片特征处理
# image_embedding = image_encoder(img)
# 文本特征处理
# sentence_embedding = text_encoder(sentence)
# 输出特征融合模型
output = model(img, sentence)
# 前向传播计算损失度
label = label = torch.tensor([[1]]).to(device)
loss = criterion(output[:, :64], output[:, 64:], label)
# 反向传播
loss.backward()
# 更新权重参数
optimizer.step()
# 损失加和求平均损失
running_loss1 += loss.item()
else:
# 打印没有的向量的文本
print(text)
# 打印每个epoch的平均损失
print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss1 / len(image_train)}")
train_loss.append(running_loss1 / len(image_train)) # 训练误差
# 进行测试集检验---召回率 Recall@5
# 在你的代码中添加以下部分以进行模型评估
# 在你的代码中添加以下部分以进行模型评估
model.eval() # 将模型设置为评估模式
running_loss2 = 0.0
with torch.no_grad(): # 禁用梯度计算
for image, text in zip(image_test, text_test):
image_path = r"E:\BaiduNetdiskDownload\B题-全部数据\B题-数据\附件1\ImageData" + "\\" + image
img = Image.open(image_path)
if img.mode != 'RGB':
img = img.convert('RGB')
img = preprocesssing_images(img).unsqueeze(0).to(device)
sentence = [torch.tensor(word_embeddings[word]) for word in text if word in word_embeddings.keys()]
if len(sentence) > 0:
sentence = torch.stack(sentence).unsqueeze(0).to(device)
# 计算模型输出
output = model(img, sentence)
# 计算余弦相似度
image_features = output[:, :64].to("cpu").detach().numpy()
text_features = output[:, 64:].to("cpu").detach().numpy()
similarities = cosine_similarity(image_features, text_features) # 计算余弦相似度
loss = 1 - similarities
running_loss2 += loss
print(running_loss2 / len(image_test))
test_loss.append(running_loss2 / len(image_test))
model.train()
print(train_loss)
print(test_loss)
# 保存模型权重
torch.save(model.state_dict(), 'image_text--2.pth')
测试数据测试,发现选出的相似度前五的图片基本没有对的哪里出问题了
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import pandas as pd
import csv
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from gensim.models import Word2Vec,KeyedVectors
import jieba
import gensim
import os
import torch.nn.functional as F
import torchvision
from collections import defaultdict
from Multimodel import ImageEncoder, TextEncoder, Multimodel
text_df2 = pd.read_csv(r"E:\BaiduNetdiskDownload\B题-全部数据\B题-数据\附件2\word_test.csv")
text_id = text_df2['text_id'].tolist()
texts = text_df2['caption'].tolist()
# 3、本地加载保存好的预训练模型---Tencent AI Lab 200维 中文词向量模型
file = 'Tencent_AILab_ChineseEmbedding-200.bin'
# print(os.path.exists(file)) 查看本地是否存在该模型路径
word_model = KeyedVectors.load(file)
# 转化词-向量的字典形式存储
word_embeddings = {}
for idx, word in enumerate(word_model.index_to_key):
word_embeddings[word] = np.array(word_model.vectors[idx])
image_df = pd.read_csv(r"E:\BaiduNetdiskDownload\B题-全部数据\B题-数据\附件2\image_data.csv")
image_id = image_df['image_id'].tolist()
# 图片预处理
preprocesssing_images=torchvision.transforms.Compose([
torchvision.transforms.Resize(size=[224,224]),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
)
])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = torch.load('image_text_model.pth')
model.eval()
d2 = defaultdict(list)
# 存储相似度
similarity_ranking = []
result_image_id = []
similarity_list = defaultdict(list)
k = 1
with torch.no_grad():
for t_id, text in zip(text_id, texts):
if k >=2: break
print(f'{k}/{len(texts)}')
k += 1
# 加载文本
sentence_vec = [torch.tensor(word_embeddings[word], dtype=torch.float) for word in text if
word in word_embeddings.keys()]
# 计算句子中每个分词向量的平均值,并将结果转换为torch张量
text_sequence = torch.stack(sentence_vec).unsqueeze(0).to(device)
# 输出文本特征
text_features = model.text_encoder(text_sequence)
text_features = text_features.to("cpu").detach().numpy()
for img_id in image_id:
image_path = r"E:\BaiduNetdiskDownload\B题-全部数据\B题-数据\附件2\ImageData" + '\\' + img_id
print(k, image_path)
image = Image.open(image_path).convert('RGB')
image = preprocesssing_images(image).unsqueeze(0).to(device)
# 输出图片特征
image_features = model.image_encoder(image)
image_features = image_features.to("cpu").detach().numpy()
similarity = cosine_similarity(text_features, image_features)
similarity_list[t_id].append(similarity)
print(similarity_list[t_id])
# 选出前5张最相似的图片
result = []
for key, value_list in similarity_list.items():
sorted_value_list = sorted(value_list, reverse=True)
top_five_values = sorted_value_list[:5]
for value in top_five_values:
index = value_list.index(value)
id_value = image_id[index]
rank = top_five_values.index(value) + 1
result.append([key, id_value, rank])
# 将结果存放到result1.csv文件中
result_df = pd.DataFrame(result, columns=['text_id', 'similarity_ranking', 'result_image_id'])
result_df.to_csv('result1.csv', index=False)