可以把下面这段自定义神经网络模型保存为h5文件吗,如果不行是否可以转化为Keras神经网络结构,然后通过model.save存储,如果可以请给出model.fit训练方法
mport tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from dialog_cut import process_cut
from question_answer import question_answer
import os
from datetime import datetime
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import io
import unicodedata
import re
from matplotlib.font_manager import FontProperties
font = FontProperties(fname="/Library/Fonts/Songti.ttc",size=8)
def source_data(source_path):
"""生成对话数据
参数:
source_path:
返回:
questions: 问题数据集
answers: 答案数据集
"""
# 获取完整对话
convs = process_cut(source_path, None)
# 获取问题和答案对话集
questions, answers = question_answer(convs)
return questions, answers
def tokenize(datas):
"""数据集处理为向量和字典
参数:
datas: 数据集列表
返回:
voc_li: 数据集向量
tokenizer: 数据集字典
"""
# 数据序列化为向量实例化
tokenizer = keras.preprocessing.text.Tokenizer(filters="")
tokenizer.fit_on_texts(datas)
# 数据系列化为向量
voc_li = tokenizer.texts_to_sequences(datas)
# 数据向量填充
voc_li = keras.preprocessing.sequence.pad_sequences(
voc_li, padding="post"
)
# 返回数据
return voc_li, tokenizer
def max_length(vectors):
"""获取数据集最长对话
参数:
vectors: 词向量
返回:
最长对话单字量
"""
return max(len(vector) for vector in vectors)
def convert(index, vectors):
"""向量与单字对应关系
参数
index:字典
vectors:词向量
返回:
无
"""
for vector in vectors:
if vector != 0:
print("{}-->{}".format(vector, index.index_word[vector]))
class Encoder(tf.keras.Model):
"""编码器"""
def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
super(Encoder, self).__init__()
# 批数据量
self.batch_sz = batch_sz
# 编码单元
self.enc_units = enc_units
# 词向量嵌入对象
self.embedding = keras.layers.Embedding(
vocab_size, embedding_dim
)
# GRU模型
self.gru = keras.layers.GRU(
self.enc_units,
return_sequences=True,
return_state=True,
recurrent_initializer="glorot_uniform"
)
@tf.function
def call(self, x, hidden):
"""编码器输出"""
x = self.embedding(x)
output, state = self.gru(x, initial_state=hidden)
return output, state
def initialize_hidden_state(self):
"""初始化隐藏层状态"""
return tf.zeros((self.batch_sz, self.enc_units))
class BahdanauAttentionMechanism(tf.keras.layers.Layer):
"""Bahdanau注意力机制"""
def __init__(self, units):
super(BahdanauAttentionMechanism, self).__init__()
# 隐藏层1
self.W1 = layers.Dense(units)
# 隐藏层2
self.W2 = layers.Dense(units)
# 输出层
self.V = layers.Dense(1)
@tf.function
def call(self, query, values):
"""权重计算
参数:
query: 向量
values: 隐藏层值
返回:
词向量
词向量权重
"""
hidden_with_time_axis = tf.expand_dims(query, 1)
# 词权重分数
score = self.V(
tf.nn.tanh(
self.W1(values)+self.W2(hidden_with_time_axis)
)
)
# 注意力权重
attention_weights = tf.nn.softmax(score, axis=1)
# 词向量权重
context_vector = attention_weights * values
context_vector = tf.math.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
class Decoder(tf.keras.Model):
"""解码器"""
def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
super(Decoder, self).__init__()
# 批量尺寸
self.batch_sz = batch_sz
# 解码单元
self.dec_units = dec_units
# 词嵌入
self.embedding = layers.Embedding(
vocab_size, embedding_dim
)
# GRU模块
self.gru = layers.GRU(
self.dec_units,
return_sequences=True,
return_state=True,
recurrent_initializer="glorot_uniform"
)
# 全连接层
self.fc = layers.Dense(vocab_size)
# 注意力计算
self.attention = BahdanauAttentionMechanism(self.dec_units)
@tf.function
def call(self, x, hidden, enc_output):
"""解码计算
参数:
x: 隐藏层输入
hidden: 隐藏层状态
enc_output: 编码器输出
返回:
x: 解码器输出
state: 隐藏层状态
attention_weights: 注意力权重
"""
# 词向量与注意力权重
context_vector, attention_weights = self.attention(
hidden,
enc_output)
# 词嵌入
x = self.embedding(x)
x = tf.concat([tf.expand_dims(context_vector, 1), x],axis=-1)
# GRU就算
output, state = self.gru(x)
# 输出
output = tf.reshape(output, (-1, output.shape[2]))
x = self.fc(output)
return x, state, attention_weights
def loss(real, pred):
"""损失值计算
参数:
标签值(对话语料答案)
预测值(解码器输出答案)
返回:
损失值
"""
# 逻辑计算
mask = tf.math.logical_not(
tf.math.equal(real, 0)
)
# 损失函数对象
loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction="none"
)
# 计算损失值
loss_value = loss_obj(real, pred)
mask = tf.cast(mask, dtype=loss_value.dtype)
loss_value *= mask
# 返回损失值均值
return tf.math.reduce_mean(loss_value)
def grad_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE):
"""计算损失函数值并获取梯度优化对象
参数:
q: 问题
a: 答案
q_hidden: 编码器隐藏层输出
encoder: 编码器对象
decoder: 解码器对象
q_index: 问题字典
BATCH_SIZE: 批量数据尺寸
返回:
批量数据损失值
梯度优化对象
"""
loss_value = 0
with tf.GradientTape() as tape:
q_output, q_hidden = encoder(q, q_hidden)
a_hidden = q_hidden
a_input = tf.expand_dims(
[a_index.word_index["<start>"]]*BATCH_SIZE,1)
for vector in range(1, a.shape[1]):
predictions, a_hidden, _ = decoder(a_input, a_hidden, q_output)
loss_value += loss(a[:,vector], predictions)
a_input = tf.expand_dims(a[:, vector],1)
batch_loss = (loss_value / int(a.shape[1]))
variables = encoder.trainable_variables + decoder.trainable_variables
return batch_loss, tape.gradient(loss_value, variables)
def optimizer_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE, optimizer):
"""优化失函数
参数:
q: 问题
a: 答案
q_hidden: 编码器隐藏层输出
encoder: 编码器对象
decoder: 解码器对象
q_index: 问题字典
BATCH_SIZE: 批量数据尺寸
optimizer: 优化器
返回:
批量数据损失值
"""
# optimizer = tf.keras.optimizers.Adam()
batch_loss, grads = grad_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE)
variables = encoder.trainable_variables + decoder.trainable_variables
optimizer.apply_gradients(zip(grads, variables))
return batch_loss
def train_model(q_hidden, encoder, decoder, q_index, BATCH_SIZE, dataset, steps_per_epoch, optimizer, checkpoint, checkpoint_prefix,summary_writer):
"""训练模型
参数:
q_hidden: 编码器隐藏层输出
encoder: 编码器对象
decoder: 解码器对象
q_index: 问题字典
BATCH_SIZE: 批量数据尺寸
dataset: 问答语料数据集
steps_per_epoch: 每轮训练迭代次数
optimizer: 优化器
checkpoint: 模型保存类对象
checkpoint_prefix: 模型保存路径
summary_writer: 日志保存对象
返回:
无
"""
# 保存模型标志位
i = 0
# 训练次数
EPOCHS = 200
# 迭代训练
for epoch in range(EPOCHS):
# 起始时间
start = time.time()
# 隐藏层初始化
a_hidden = encoder.initialize_hidden_state()
# 总损失
total_loss = 0
# 问答数据集解析
for (batch, (q, a)) in enumerate(dataset.take(steps_per_epoch)):
# 批量损失值
batch_loss = optimizer_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE, optimizer)
# 总损失之
total_loss += batch_loss
with summary_writer.as_default():
tf.summary.scalar("batch loss", batch_loss.numpy(), step=epoch)
# 每训练100组对话输出一次结果
if batch % 100 == 0:
print("第{}次训练,第{}批数据损失值:{:.4f}".format(
epoch+1,
batch+1,
batch_loss.numpy()
))
# 训练100轮保存一次模型
with summary_writer.as_default():
tf.summary.scalar("total loss", total_loss/steps_per_epoch,step=epoch)
if(epoch+1) % 100 == 0:
i += 1
print("====第{}次保存训练模型====".format(i))
checkpoint.save(file_prefix=checkpoint_prefix)
print("第{}次训练,总损失值:{:.4f}".format(epoch+1, total_loss/steps_per_epoch))
print("训练耗时:{:.1f}秒".format(time.time()-start))
def preprocess_question(question):
"""问题数据集处理,添加开始和结束标志
参数:
question: 问题
返回:
处理后的问题
"""
question = "<start> " + " ".join(question) + " <end>"
return question
def answer_vector(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
"""答案向量解码
参数
question: 问题
a_max_len: 答案最大长度
q_max_len: 问题最大长度
q_index: 问题字典
a_index: 答案索引
encoder: 编码器对象
decoder: 解码器对象
返回
result: 答案向量解码后的答案
question: 问题
attention_plot: 词向量权重
"""
# 词向量权重初始化
attention_plot = np.zeros((a_max_len, q_max_len))
# 问题预处理
question = preprocess_question(question)
# 问题转词向量
inputs = [q_index.word_index[i] for i in question.split(" ")]
# 问题序列化
inputs = keras.preprocessing.sequence.pad_sequences(
[inputs],
maxlen=q_max_len,
padding="post"
)
# 问题字符转张量
inputs = tf.convert_to_tensor(inputs)
result = ""
# 隐藏层状态
hidden = [tf.zeros((1, units))]
# 编码器输出和隐藏层状态
q_out, q_hidden = encoder(inputs, hidden)
a_hidden = q_hidden
# 解码器输入扩充维度
a_input = tf.expand_dims([a_index.word_index["<start>"]], 0)
# 词向量解码
for t in range(a_max_len):
predictions, a_hidden, attention_weights = decoder(
a_input,
a_hidden,
q_out
)
# 词向量权重
attention_weights = tf.reshape(attention_weights, (-1,))
attention_plot[t] = attention_weights.numpy()
# 预测值索引
predicted_id = tf.argmax(predictions[0]).numpy()
# 预测值处理,去除<end>
result += a_index.index_word[predicted_id]
if a_index.index_word[predicted_id] != "<end>":
result += a_index.index_word[predicted_id]
else:
return result, question, attention_plot
# 问题答案作为解码器输入
a_input = tf.expand_dims([predicted_id], 0)
# 返回数据
return result, question, attention_plot
def answer_vector_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
"""答案向量解码
参数
question: 问题
a_max_len: 答案最大长度
q_max_len: 问题最大长度
q_index: 问题字典
a_index: 答案索引
encoder: 编码器对象
decoder: 解码器对象
返回
result: 答案向量解码后的答案
question: 问题
attention_plot: 词向量权重
"""
# 词向量权重初始化
attention_plot = np.zeros((a_max_len, q_max_len))
# 问题预处理
question = preprocess_question(question)
# 问题转词向量
inputs = [q_index.word_index[i] for i in question.split(" ")]
# 问题序列化
inputs = keras.preprocessing.sequence.pad_sequences(
[inputs],
maxlen=q_max_len,
padding="post"
)
# 问题字符转张量
inputs = tf.convert_to_tensor(inputs)
result = ""
# 隐藏层状态
hidden = [tf.zeros((1, units))]
# 编码器输出和隐藏层状态
q_out, q_hidden = encoder(inputs, hidden)
a_hidden = q_hidden
# 解码器输入扩充维度
a_input = tf.expand_dims([a_index.word_index["<start>"]], 0)
# 词向量解码
for t in range(a_max_len):
predictions, a_hidden, attention_weights = decoder(
a_input,
a_hidden,
q_out
)
# 词向量权重
attention_weights = tf.reshape(attention_weights, (-1,))
attention_plot[t] = attention_weights.numpy()
# 预测值索引
predicted_id = tf.argmax(predictions[0]).numpy()
# 生成答案
result += a_index.index_word[predicted_id]+" "
if a_index.index_word[predicted_id] == "<end>":
return result, question, attention_plot
# 问题答案作为解码器输入
a_input = tf.expand_dims([predicted_id], 0)
# 返回数据
return result, question, attention_plot
def plot_attention(attention, question, predicted):
"""绘制问题和答案混淆矩阵
参数:
attention:注意力参数
question: 问题
predicted: 预测值
返回:
无
"""
# 新建绘图区
fig = plt.figure(figsize=(6,6))
# 添加分区
ax = fig.add_subplot(1,1,1)
# 矩阵信息写入绘图区
# ax.matshow(attention, cmap="viridis")
ax.matshow(attention, cmap=plt.cm.Blues)
# 设置字体尺寸
fontdict={"fontsize":6}
# x轴显示数据
ax.set_xticklabels([""]+question, fontdict=fontdict, rotation=90,fontproperties=font)
# y轴显示数据
ax.set_yticklabels([""]+predicted, fontdict=fontdict, fontproperties=font)
# x轴设置位置
ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
# y轴设置位置
ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
plt.savefig("./images/q_a_image.png", format="png", dpi=300)
plt.show()
def chat(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
"""对话
参数
question: 问题
a_max_len: 答案最大长度
q_max_len: 问题最大长度
q_index: 问题字典
a_index: 答案索引
encoder: 编码器对象
decoder: 解码器对象
返回
无
"""
result, question, attention_plot = answer_vector(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder)
print("机器人:", result)
def chat_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
"""对话
参数
question: 问题
a_max_len: 答案最大长度
q_max_len: 问题最大长度
q_index: 问题字典
a_index: 答案索引
encoder: 编码器对象
decoder: 解码器对象
返回
无
"""
result, question, attention_plot = answer_vector_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder)
print("机器人:", result)
attention_plot = attention_plot[:len(result.split(" ")),:len(question.split(" "))]
plot_attention(attention_plot, question.split(" "), result.split(" "))
if __name__ == "__main__":
stamp = datetime.now().strftime("%Y%m%d-%H:%M:%S")
source_path = "./data/source_data.conv"
# 下载文件
path_to_zip = tf.keras.utils.get_file(
'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
extract=True)
path_to_file = os.path.dirname(path_to_zip)+"/spa-eng/spa.txt"
# answers, questions = create_dataset(path_to_file, 24000)
# q_vec, q_index = tokenize(questions)
# a_vec, a_index = tokenize(answers)
questions, answers = source_data(source_path)
q_vec, q_index = tokenize(questions)
a_vec, a_index = tokenize(answers)
print("voc:", q_vec)
print("tokenize:", q_index.index_word)
print("voc:", a_vec)
print("tokenize:", a_index.index_word)
q_max_len = max_length(q_vec)
a_max_len = max_length(a_vec)
convert(q_index, q_vec[0])
BUFFER_SIZE = len(q_vec)
print("buffer size:", BUFFER_SIZE)
BATCH_SIZE = 64
steps_per_epoch = len(q_vec)//BATCH_SIZE
embedding_dim = 256
units = 1024
q_vocab_size = len(q_index.word_index)+1
a_vocab_size = len(a_index.word_index)+1
dataset = tf.data.Dataset.from_tensor_slices(
(q_vec, a_vec)
).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
# 数据遍历测试
# for(batch,(q, a)) in enumerate(dataset.take(steps_per_epoch)):
# print("batch:",batch)
# print("question:",q)
# print("answer:",a)
# 正常训练
q_batch, a_batch = next(iter(dataset))
print("question batch:",q_batch.shape)
print("answer batch:", a_batch.shape)
log_path = "./logs/chat"+stamp.replace(":","-")
summary_writer = tf.summary.create_file_writer(log_path)
tf.summary.trace_on(graph=True, profiler=True)
encoder = Encoder(
q_vocab_size,
embedding_dim,
units,
BATCH_SIZE)
q_hidden = encoder.initialize_hidden_state()
q_output, q_hidden = encoder(q_batch, q_hidden)
with summary_writer.as_default():
tf.summary.trace_export(name="chat-en", step=0, profiler_outdir=log_path)
tf.summary.trace_on(graph=True, profiler=True)
attention_layer = BahdanauAttentionMechanism(10)
attention_result, attention_weights = attention_layer(
q_hidden, q_output
)
with summary_writer.as_default():
tf.summary.trace_export(name="chat-atten", step=0, profiler_outdir=log_path)
tf.summary.trace_on(graph=True, profiler=True)
decoder = Decoder(
a_vocab_size,
embedding_dim,
units,
BATCH_SIZE
)
a_output, _, _ = decoder(
tf.random.uniform((64,1)),
q_hidden,
q_output
)
with summary_writer.as_default():
tf.summary.trace_export(name="chat-dec", step=0, profiler_outdir=log_path)
optimizer = tf.keras.optimizers.Adam()
checkpoint_dir = "./models"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(
optimizer=optimizer,
encoder=encoder,
decoder=decoder
)
# 训练模型
train_model(q_hidden, encoder, decoder, q_index, BATCH_SIZE, dataset, steps_per_epoch, optimizer, checkpoint, checkpoint_prefix,summary_writer)
# 恢复模型,进行预测
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
# 对话预测
print("====机器人1号为您服务====")
while True:
inputs = input("用户:")
if inputs == "q":
exit()
chat(inputs,a_max_len, q_max_len, q_index, a_index, encoder, decoder)
# chat_image(inputs,a_max_len, q_max_len, q_index, a_index, encoder, decoder)