onhuko 2023-04-16 13:40 采纳率: 58.3%
浏览 84
已结题

自定义神经网络保存为h5文件

可以把下面这段自定义神经网络模型保存为h5文件吗,如果不行是否可以转化为Keras神经网络结构,然后通过model.save存储,如果可以请给出model.fit训练方法

mport tensorflow as tf 
from tensorflow import keras
from tensorflow.keras import layers 
from dialog_cut import process_cut
from question_answer import question_answer
import os
from datetime import datetime
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import io
import unicodedata
import re
from matplotlib.font_manager import FontProperties
font = FontProperties(fname="/Library/Fonts/Songti.ttc",size=8)


def source_data(source_path):
    """生成对话数据
    参数:
    source_path:
    返回:
        questions: 问题数据集
        answers: 答案数据集
    """
    # 获取完整对话
    convs = process_cut(source_path, None)
    # 获取问题和答案对话集
    questions, answers = question_answer(convs)
    return questions, answers
def tokenize(datas):
    """数据集处理为向量和字典
    参数:
        datas: 数据集列表
    返回:
        voc_li: 数据集向量
        tokenizer: 数据集字典
    """
    # 数据序列化为向量实例化
    tokenizer = keras.preprocessing.text.Tokenizer(filters="")
    tokenizer.fit_on_texts(datas)
    # 数据系列化为向量
    voc_li = tokenizer.texts_to_sequences(datas)
    # 数据向量填充
    voc_li = keras.preprocessing.sequence.pad_sequences(
        voc_li, padding="post"
    )
    # 返回数据
    return voc_li, tokenizer

def max_length(vectors):
    """获取数据集最长对话
    参数:
        vectors: 词向量
    返回:
        最长对话单字量
    """
    return max(len(vector) for vector in vectors)
def convert(index, vectors):
    """向量与单字对应关系
    参数
        index:字典
        vectors:词向量
    返回:
        无
    """
    for vector in vectors:
        if vector != 0:
            print("{}-->{}".format(vector, index.index_word[vector])) 

class Encoder(tf.keras.Model):
    """编码器"""
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        # 批数据量
        self.batch_sz = batch_sz
        # 编码单元
        self.enc_units = enc_units
        # 词向量嵌入对象
        self.embedding = keras.layers.Embedding(
            vocab_size, embedding_dim
        )
        # GRU模型
        self.gru = keras.layers.GRU(
            self.enc_units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer="glorot_uniform"
        )
    @tf.function
    def call(self, x, hidden):
        """编码器输出"""
        x = self.embedding(x)
        output, state = self.gru(x, initial_state=hidden)
        return output, state
    def initialize_hidden_state(self):
        """初始化隐藏层状态"""
        return tf.zeros((self.batch_sz, self.enc_units))

class BahdanauAttentionMechanism(tf.keras.layers.Layer):
    """Bahdanau注意力机制"""
    def __init__(self, units):
        super(BahdanauAttentionMechanism, self).__init__()
        # 隐藏层1
        self.W1 = layers.Dense(units)
        # 隐藏层2
        self.W2 = layers.Dense(units)
        # 输出层
        self.V = layers.Dense(1)
    @tf.function
    def call(self, query, values):
        """权重计算
        参数:
            query: 向量
            values: 隐藏层值
        返回:
            词向量
            词向量权重
        """
        hidden_with_time_axis = tf.expand_dims(query, 1)
        # 词权重分数
        score = self.V(
            tf.nn.tanh(
                self.W1(values)+self.W2(hidden_with_time_axis)
            )
        )
        # 注意力权重
        attention_weights = tf.nn.softmax(score, axis=1)
        # 词向量权重
        context_vector = attention_weights * values
        context_vector = tf.math.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

class Decoder(tf.keras.Model):
    """解码器"""
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        # 批量尺寸
        self.batch_sz = batch_sz 
        # 解码单元 
        self.dec_units = dec_units
        # 词嵌入 
        self.embedding = layers.Embedding(
            vocab_size, embedding_dim
        )
        # GRU模块
        self.gru = layers.GRU(
            self.dec_units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer="glorot_uniform"
        )
        # 全连接层
        self.fc = layers.Dense(vocab_size)
        # 注意力计算
        self.attention = BahdanauAttentionMechanism(self.dec_units)
    @tf.function
    def call(self, x, hidden, enc_output):
        """解码计算
        参数:
            x: 隐藏层输入
            hidden: 隐藏层状态
            enc_output: 编码器输出
        返回:
            x: 解码器输出
            state: 隐藏层状态
            attention_weights: 注意力权重
        """
        # 词向量与注意力权重
        context_vector, attention_weights = self.attention(
            hidden,
            enc_output)
        # 词嵌入
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x],axis=-1)
        # GRU就算
        output, state = self.gru(x)
        # 输出
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state, attention_weights

def loss(real, pred):
    """损失值计算
    参数:
        标签值(对话语料答案)
        预测值(解码器输出答案)
    返回:
        损失值
    """
    # 逻辑计算
    mask = tf.math.logical_not(
        tf.math.equal(real, 0)
    )
    # 损失函数对象
    loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction="none"
    )
    # 计算损失值
    loss_value = loss_obj(real, pred)
    mask = tf.cast(mask, dtype=loss_value.dtype)
    loss_value *= mask
    # 返回损失值均值
    return tf.math.reduce_mean(loss_value)

def grad_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE):
    """计算损失函数值并获取梯度优化对象
    参数:
        q: 问题
        a: 答案
        q_hidden: 编码器隐藏层输出
        encoder: 编码器对象
        decoder: 解码器对象
        q_index: 问题字典
        BATCH_SIZE: 批量数据尺寸
    返回:
        批量数据损失值
        梯度优化对象
    """
    loss_value = 0
    with tf.GradientTape() as tape:
        q_output, q_hidden = encoder(q, q_hidden)
        a_hidden = q_hidden
        a_input = tf.expand_dims(
            [a_index.word_index["<start>"]]*BATCH_SIZE,1)
        for vector in range(1, a.shape[1]):
            predictions, a_hidden, _ = decoder(a_input, a_hidden, q_output)
            loss_value += loss(a[:,vector], predictions)
            a_input = tf.expand_dims(a[:, vector],1)
        batch_loss = (loss_value / int(a.shape[1]))
        variables = encoder.trainable_variables + decoder.trainable_variables 
        return batch_loss, tape.gradient(loss_value, variables)

def optimizer_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE, optimizer):
    """优化失函数
    参数:
        q: 问题
        a: 答案
        q_hidden: 编码器隐藏层输出
        encoder: 编码器对象
        decoder: 解码器对象
        q_index: 问题字典
        BATCH_SIZE: 批量数据尺寸
        optimizer: 优化器
    返回:
        批量数据损失值
    """
    # optimizer = tf.keras.optimizers.Adam()
    batch_loss, grads = grad_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE)
    variables = encoder.trainable_variables + decoder.trainable_variables
    optimizer.apply_gradients(zip(grads, variables))
    return batch_loss

def train_model(q_hidden, encoder, decoder, q_index, BATCH_SIZE, dataset, steps_per_epoch, optimizer, checkpoint, checkpoint_prefix,summary_writer):
    """训练模型
    参数:
        q_hidden: 编码器隐藏层输出
        encoder: 编码器对象
        decoder: 解码器对象
        q_index: 问题字典
        BATCH_SIZE: 批量数据尺寸
        dataset: 问答语料数据集
        steps_per_epoch: 每轮训练迭代次数
        optimizer: 优化器
        checkpoint: 模型保存类对象
        checkpoint_prefix: 模型保存路径
        summary_writer: 日志保存对象
    返回:
        无
    """
    # 保存模型标志位
    i = 0
    # 训练次数
    EPOCHS = 200
    # 迭代训练
    for epoch in range(EPOCHS):
        # 起始时间
        start = time.time()
        # 隐藏层初始化
        a_hidden = encoder.initialize_hidden_state()
        # 总损失
        total_loss = 0
        # 问答数据集解析
        for (batch, (q, a)) in enumerate(dataset.take(steps_per_epoch)):
            # 批量损失值
            batch_loss = optimizer_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE, optimizer)
            # 总损失之
            total_loss += batch_loss
            with summary_writer.as_default():
                tf.summary.scalar("batch loss", batch_loss.numpy(), step=epoch)
            # 每训练100组对话输出一次结果
            if batch % 100 == 0:
                print("第{}次训练,第{}批数据损失值:{:.4f}".format(
                    epoch+1,
                    batch+1,
                    batch_loss.numpy()
                ))
        # 训练100轮保存一次模型
        with summary_writer.as_default():
            tf.summary.scalar("total loss", total_loss/steps_per_epoch,step=epoch)
        if(epoch+1) % 100 == 0:
            i += 1
            print("====第{}次保存训练模型====".format(i))
            checkpoint.save(file_prefix=checkpoint_prefix)
        print("第{}次训练,总损失值:{:.4f}".format(epoch+1, total_loss/steps_per_epoch))
        print("训练耗时:{:.1f}秒".format(time.time()-start))

def preprocess_question(question):
    """问题数据集处理,添加开始和结束标志
    参数:
        question: 问题
    返回:
        处理后的问题
    """
    question = "<start> " + " ".join(question) + " <end>"
    return question

def answer_vector(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
    """答案向量解码
    参数
    question: 问题
    a_max_len: 答案最大长度
    q_max_len: 问题最大长度
    q_index: 问题字典
    a_index: 答案索引
    encoder: 编码器对象
    decoder: 解码器对象
    返回
        result: 答案向量解码后的答案
        question: 问题
        attention_plot: 词向量权重
    """
    # 词向量权重初始化
    attention_plot = np.zeros((a_max_len, q_max_len))
    # 问题预处理
    question = preprocess_question(question)
    # 问题转词向量
    inputs = [q_index.word_index[i] for i in question.split(" ")]
    # 问题序列化
    inputs = keras.preprocessing.sequence.pad_sequences(
        [inputs],
        maxlen=q_max_len,
        padding="post"
    )
    # 问题字符转张量
    inputs = tf.convert_to_tensor(inputs)
    result = ""
    # 隐藏层状态
    hidden = [tf.zeros((1, units))]
    # 编码器输出和隐藏层状态
    q_out, q_hidden = encoder(inputs, hidden)
    a_hidden = q_hidden
    # 解码器输入扩充维度 
    a_input = tf.expand_dims([a_index.word_index["<start>"]], 0)
    # 词向量解码
    for t in range(a_max_len):
        predictions, a_hidden, attention_weights = decoder(
            a_input,
            a_hidden,
            q_out
        )
        # 词向量权重
        attention_weights = tf.reshape(attention_weights, (-1,))
        attention_plot[t] = attention_weights.numpy()
        # 预测值索引
        predicted_id = tf.argmax(predictions[0]).numpy()
        # 预测值处理,去除<end>
        result += a_index.index_word[predicted_id]
        if a_index.index_word[predicted_id] != "<end>":
            result += a_index.index_word[predicted_id]
        else:
            return result, question, attention_plot
        # 问题答案作为解码器输入
        a_input = tf.expand_dims([predicted_id], 0)
    # 返回数据
    return result, question, attention_plot

def answer_vector_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
    """答案向量解码
    参数
    question: 问题
    a_max_len: 答案最大长度
    q_max_len: 问题最大长度
    q_index: 问题字典
    a_index: 答案索引
    encoder: 编码器对象
    decoder: 解码器对象
    返回
        result: 答案向量解码后的答案
        question: 问题
        attention_plot: 词向量权重
    """
    # 词向量权重初始化
    attention_plot = np.zeros((a_max_len, q_max_len))
    # 问题预处理
    question = preprocess_question(question)
    # 问题转词向量
    inputs = [q_index.word_index[i] for i in question.split(" ")]
    # 问题序列化
    inputs = keras.preprocessing.sequence.pad_sequences(
        [inputs],
        maxlen=q_max_len,
        padding="post"
    )
    # 问题字符转张量
    inputs = tf.convert_to_tensor(inputs)
    result = ""
    # 隐藏层状态
    hidden = [tf.zeros((1, units))]
    # 编码器输出和隐藏层状态
    q_out, q_hidden = encoder(inputs, hidden)
    a_hidden = q_hidden
    # 解码器输入扩充维度 
    a_input = tf.expand_dims([a_index.word_index["<start>"]], 0)
    # 词向量解码
    for t in range(a_max_len):
        predictions, a_hidden, attention_weights = decoder(
            a_input,
            a_hidden,
            q_out
        )
        # 词向量权重
        attention_weights = tf.reshape(attention_weights, (-1,))
        attention_plot[t] = attention_weights.numpy()
        # 预测值索引
        predicted_id = tf.argmax(predictions[0]).numpy()
        # 生成答案
        result += a_index.index_word[predicted_id]+" "
        if a_index.index_word[predicted_id] == "<end>":
            return result, question, attention_plot
        # 问题答案作为解码器输入
        a_input = tf.expand_dims([predicted_id], 0)
    # 返回数据
    return result, question, attention_plot

def plot_attention(attention, question, predicted):
    """绘制问题和答案混淆矩阵
    参数:
        attention:注意力参数
        question: 问题
        predicted: 预测值
    返回:
        无
    """
    # 新建绘图区
    fig = plt.figure(figsize=(6,6))
    # 添加分区
    ax = fig.add_subplot(1,1,1)
    # 矩阵信息写入绘图区
    # ax.matshow(attention, cmap="viridis")
    ax.matshow(attention, cmap=plt.cm.Blues)
    # 设置字体尺寸
    fontdict={"fontsize":6}
    # x轴显示数据
    ax.set_xticklabels([""]+question, fontdict=fontdict, rotation=90,fontproperties=font)
    # y轴显示数据
    ax.set_yticklabels([""]+predicted, fontdict=fontdict, fontproperties=font)
    # x轴设置位置
    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    # y轴设置位置
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
    plt.savefig("./images/q_a_image.png", format="png", dpi=300)
    plt.show()
def chat(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
    """对话
    参数
    question: 问题
    a_max_len: 答案最大长度
    q_max_len: 问题最大长度
    q_index: 问题字典
    a_index: 答案索引
    encoder: 编码器对象
    decoder: 解码器对象
    返回
        无
    """
    result, question, attention_plot = answer_vector(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder)
    print("机器人:", result)

def chat_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
    """对话
    参数
    question: 问题
    a_max_len: 答案最大长度
    q_max_len: 问题最大长度
    q_index: 问题字典
    a_index: 答案索引
    encoder: 编码器对象
    decoder: 解码器对象
    返回
        无
    """
    result, question, attention_plot = answer_vector_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder)
    print("机器人:", result)
    attention_plot = attention_plot[:len(result.split(" ")),:len(question.split(" "))]
    plot_attention(attention_plot, question.split(" "), result.split(" "))


if __name__ == "__main__":
    stamp = datetime.now().strftime("%Y%m%d-%H:%M:%S")
    source_path = "./data/source_data.conv"
    # 下载文件
    path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)
    path_to_file = os.path.dirname(path_to_zip)+"/spa-eng/spa.txt"
    # answers, questions  = create_dataset(path_to_file, 24000)
    # q_vec, q_index = tokenize(questions)
    # a_vec, a_index = tokenize(answers)
    questions, answers = source_data(source_path)
    q_vec, q_index = tokenize(questions)
    a_vec, a_index = tokenize(answers)
    print("voc:", q_vec)
    print("tokenize:", q_index.index_word)
    print("voc:", a_vec)
    print("tokenize:", a_index.index_word)
    
    q_max_len = max_length(q_vec)
    a_max_len = max_length(a_vec)
    convert(q_index, q_vec[0])
    BUFFER_SIZE = len(q_vec)
    print("buffer size:", BUFFER_SIZE)
    BATCH_SIZE = 64
    steps_per_epoch = len(q_vec)//BATCH_SIZE
    embedding_dim = 256
    units = 1024
    q_vocab_size = len(q_index.word_index)+1
    a_vocab_size = len(a_index.word_index)+1
    dataset = tf.data.Dataset.from_tensor_slices(
        (q_vec, a_vec)
    ).shuffle(BUFFER_SIZE)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    # 数据遍历测试
    # for(batch,(q, a)) in enumerate(dataset.take(steps_per_epoch)):
    #     print("batch:",batch)
    #     print("question:",q)
    #     print("answer:",a)
    # 正常训练
    q_batch, a_batch = next(iter(dataset))
    print("question batch:",q_batch.shape)
    print("answer batch:", a_batch.shape)
    log_path = "./logs/chat"+stamp.replace(":","-")
    summary_writer = tf.summary.create_file_writer(log_path)
    tf.summary.trace_on(graph=True, profiler=True)
    encoder = Encoder(
        q_vocab_size,
        embedding_dim,
        units,
        BATCH_SIZE)
    q_hidden = encoder.initialize_hidden_state()
    q_output, q_hidden = encoder(q_batch, q_hidden)
    with summary_writer.as_default():
        tf.summary.trace_export(name="chat-en", step=0, profiler_outdir=log_path)

    tf.summary.trace_on(graph=True, profiler=True)
    attention_layer = BahdanauAttentionMechanism(10)
    attention_result, attention_weights = attention_layer(
        q_hidden, q_output
    )
    with summary_writer.as_default():
        tf.summary.trace_export(name="chat-atten", step=0, profiler_outdir=log_path)
    
    tf.summary.trace_on(graph=True, profiler=True)
    decoder = Decoder(
        a_vocab_size,
        embedding_dim,
        units,
        BATCH_SIZE
    )
    a_output, _, _ = decoder(
        tf.random.uniform((64,1)),
        q_hidden,
        q_output
    )
    with summary_writer.as_default():
        tf.summary.trace_export(name="chat-dec", step=0, profiler_outdir=log_path)
    optimizer = tf.keras.optimizers.Adam()
    checkpoint_dir = "./models"
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
    checkpoint = tf.train.Checkpoint(
        optimizer=optimizer, 
        encoder=encoder, 
        decoder=decoder
    )
    # 训练模型
    train_model(q_hidden, encoder, decoder, q_index, BATCH_SIZE, dataset, steps_per_epoch, optimizer, checkpoint, checkpoint_prefix,summary_writer)
    # 恢复模型,进行预测
    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
    # 对话预测
    print("====机器人1号为您服务====")
    while True:
        inputs = input("用户:")
        if inputs == "q":
            exit()
        chat(inputs,a_max_len, q_max_len, q_index, a_index, encoder, decoder)
        # chat_image(inputs,a_max_len, q_max_len, q_index, a_index, encoder, decoder)

  • 写回答

2条回答 默认 最新

  • 努力の小熊 全国大学生物联网技术与应用大赛获奖者 2023-04-16 19:10
    关注

    努力の小熊结合了最新版本ChatGPT4.0的回答:
    将自定义神经网络模型保存为h5文件在这种情况下可能不是最佳选择,因为h5文件存储对于具有自定义层和训练逻辑的模型可能不适用。不过,你可以使用 TensorFlow 的 SavedModel 格式来保存整个模型,包括其架构、优化器和已训练的权重。

    为了将模型转换为Keras模型,你需要将Encoder和Decoder层嵌套到一个Keras模型中。这里是一个示例,将你的Encoder和Decoder组合到一个Keras模型中,并使用model.fit进行训练:

    import tensorflow as tf
    from tensorflow.keras.layers import Input
    from tensorflow.keras.models import Model
    
    # 假设encoder和decoder已经定义
    # encoder = Encoder(...)
    # decoder = Decoder(...)
    
    # 定义Keras模型的输入和输出
    question_input = Input(shape=(q_max_len,), dtype=tf.int32)
    answer_input = Input(shape=(a_max_len,), dtype=tf.int32)
    
    q_hidden = encoder.initialize_hidden_state()
    q_out, q_hidden = encoder(question_input, q_hidden)
    
    # 在这里,我们只需要解码器的输出
    a_out, _, _ = decoder(answer_input, q_hidden, q_out)
    
    # 创建Keras模型
    model = Model(inputs=[question_input, answer_input], outputs=a_out)
    
    # 编译和训练模型
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    history = model.fit([q_vec, a_vec[:, :-1]], a_vec[:, 1:], batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.1)
    

    这将创建一个Keras模型,你可以使用model.fit进行训练。但是,这种方法可能不适用于你的注意力机制,因为注意力权重需要在训练过程中更新。所以,我建议你使用 TensorFlow SavedModel 格式保存你的模型。

    # 保存模型
    tf.saved_model.save(encoder, "saved_model/encoder")
    tf.saved_model.save(decoder, "saved_model/decoder")
    
    # 加载模型
    loaded_encoder = tf.saved_model.load("saved_model/encoder")
    loaded_decoder = tf.saved_model.load("saved_model/decoder")
    

    这样你可以在保存和加载模型时保留自定义的训练逻辑。如果你希望将注意力机制与Keras模型结合,你可能需要对代码进行更多的修改,以适应Keras模型的训练逻辑。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 4月25日
  • 已采纳回答 4月17日
  • 创建了问题 4月16日

悬赏问题

  • ¥15 应该如何判断含间隙的曲柄摇杆机构,轴与轴承是否发生了碰撞?
  • ¥15 vue3+express部署到nginx
  • ¥20 搭建pt1000三线制高精度测温电路
  • ¥15 使用Jdk8自带的算法,和Jdk11自带的加密结果会一样吗,不一样的话有什么解决方案,Jdk不能升级的情况
  • ¥15 画两个图 python或R
  • ¥15 在线请求openmv与pixhawk 实现实时目标跟踪的具体通讯方法
  • ¥15 八路抢答器设计出现故障
  • ¥15 opencv 无法读取视频
  • ¥15 按键修改电子时钟,C51单片机
  • ¥60 Java中实现如何实现张量类,并用于图像处理(不运用其他科学计算库和图像处理库))