2501_93162994 2025-09-15 15:58 采纳率: 0%
浏览 4

模糊认知图只有节点没有边

对实验数据进行模糊认知图分析,企图找到节点之间的因果关系,可在输出的图片中,发现只有节点,没有边。数据处理显示没问题,并非平滑。

img

import numpy as np
import os
import time
import matplotlib.pyplot as plt
import networkx as nx
import pandas as pd
import torch
import torch.nn.functional as F
from sklearn.preprocessing import MinMaxScaler

# 设置中文字体
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
plt.rcParams["font.size"] = 10


class GPUFuzzyCognitiveMap:
    """基于GPU加速的模糊认知图类"""

    def __init__(self, num_concepts, concept_names=None, device=None):
        """
        初始化模糊认知图
        num_concepts: 概念节点数量
        concept_names: 概念节点名称列表
        device: 计算设备 ('cuda'或'cpu')
        """
        self.num_concepts = num_concepts
        self.concept_names = concept_names if concept_names else [f"概念{i}" for i in range(num_concepts)]

        # 自动选择设备(优先GPU)
        self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"使用计算设备: {self.device}")

        # 初始化邻接矩阵(因果权重),范围[-1, 1],存储在GPU上
        self.weights = torch.zeros((num_concepts, num_concepts), device=self.device, dtype=torch.float32)

        # 概念状态值,范围[0, 1]
        self.states = torch.zeros(num_concepts, device=self.device, dtype=torch.float32)

    def sigmoid_activation(self, x, alpha=1.0):
        """GPU加速的sigmoid激活函数"""
        return torch.sigmoid(alpha * x)

    def learn_from_data(self, data, max_iter=100, learning_rate=0.01, alpha=0.5):
        """
        从数据中学习FCM权重(GPU加速版本)
        data: 输入数据,形状为(样本数, 时间步长, 概念数)
        max_iter: 最大迭代次数
        learning_rate: 学习率
        alpha: 激活函数参数
        """
        # 将数据转换为GPU张量
        data_tensor = torch.tensor(data, device=self.device, dtype=torch.float32)
        n_samples, n_time, n_concepts = data_tensor.shape
        assert n_concepts == self.num_concepts, "数据维度与FCM概念数不匹配"

        # 数据归一化到[0, 1](在CPU上预处理后转移到GPU)
        scaler = MinMaxScaler()
        flattened_data = data.reshape(-1, n_concepts)
        normalized_data = scaler.fit_transform(flattened_data).reshape(n_samples, n_time, n_concepts)
        normalized_tensor = torch.tensor(normalized_data, device=self.device, dtype=torch.float32)

        # 初始化权重梯度累加器(用于并行更新)
        weights_grad = torch.zeros_like(self.weights)

        # 迭代学习权重
        for iter in range(max_iter):
            total_error = 0.0

            # 批量处理样本(利用GPU并行性)
            for s in range(n_samples):
                sample = normalized_tensor[s]

                # 初始化状态
                self.states = sample[0].clone()

                # 时间序列上的因果学习
                for t in range(1, n_time):
                    prev_states = self.states.clone()
                    target_states = sample[t]

                    # 计算预测状态(矩阵乘法在GPU上并行执行)
                    predicted_states = self.sigmoid_activation(
                        torch.matmul(prev_states, self.weights), alpha
                    )

                    # 计算误差
                    error = target_states - predicted_states
                    total_error += torch.mean(torch.abs(error)).item()

                    # 计算权重梯度(并行更新所有权重)
                    outer_product = torch.outer(prev_states, error)
                    weights_grad += learning_rate * outer_product * (1 - torch.eye(n_concepts, device=self.device))

            # 应用梯度更新并限制权重范围
            self.weights += weights_grad / (n_samples * (n_time - 1))
            self.weights = torch.clamp(self.weights, -1, 1)
            weights_grad.zero_()  # 重置梯度累加器

            # 打印学习进度
            if (iter + 1) % 10 == 0:
                avg_error = total_error / (n_samples * (n_time - 1))
                print(f"迭代 {iter + 1}/{max_iter}, 平均误差: {avg_error:.6f}")

        return self.weights.cpu().numpy()  # 返回CPU上的numpy数组


def build_fcm_from_eeg(dataset, concept_names, max_iter=100, learning_rate=0.01):
    """从EEG数据构建GPU加速的模糊认知图"""
    n_samples, n_time, n_channels = dataset.shape

    # 初始化FCM,每个电极作为一个概念节点
    fcm = GPUFuzzyCognitiveMap(n_channels, concept_names)

    # 从数据中学习因果权重(GPU加速)
    print("开始学习模糊认知图权重...")
    start_time = time.time()
    weights = fcm.learn_from_data(
        dataset,
        max_iter=max_iter,
        learning_rate=learning_rate
    )
    learn_time = time.time() - start_time
    print(f"模糊认知图学习完成,耗时: {learn_time:.2f}秒")

    return fcm, weights


def visualize_fcm(fcm, threshold=0.1, save_dir=None):
    """可视化模糊认知图(与CPU版本相同)"""
    # 从GPU获取权重(转为numpy)
    weights = fcm.weights.cpu().numpy()

    # 创建有向图
    G = nx.DiGraph()

    # 添加节点
    for i, name in enumerate(fcm.concept_names):
        G.add_node(i, label=name)

    # 添加边(只添加权重绝对值超过阈值的)
    edges = []
    for i in range(fcm.num_concepts):
        for j in range(fcm.num_concepts):
            weight = weights[i, j]
            if i != j and abs(weight) > threshold:
                edges.append((i, j, weight))
                G.add_edge(i, j, weight=weight)

    # 布局设置
    pos = nx.spring_layout(G, k=0.5, iterations=50)

    # 绘制节点
    plt.figure(figsize=(18, 16))
    nx.draw_networkx_nodes(G, pos, node_size=3000, node_color='lightblue')

    # 绘制标签
    labels = {i: fcm.concept_names[i].split('(')[0] for i in range(fcm.num_concepts)}
    nx.draw_networkx_labels(G, pos, labels, font_size=10)

    # 绘制边(区分正相关和负相关)
    positive_edges = [(i, j) for i, j, w in edges if w > 0]
    negative_edges = [(i, j) for i, j, w in edges if w < 0]
    edge_weights = {(i, j): f"{w:.2f}" for i, j, w in edges}

    nx.draw_networkx_edges(G, pos, edgelist=positive_edges, edge_color='red',
                           width=[abs(w) * 5 for _, _, w in edges if w > 0],
                           arrowstyle='->', arrowsize=20)
    nx.draw_networkx_edges(G, pos, edgelist=negative_edges, edge_color='blue',
                           width=[abs(w) * 5 for _, _, w in edges if w < 0],
                           arrowstyle='->', arrowsize=20)

    # 绘制边权重
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_weights, font_size=8)

    plt.title("EEG数据模糊认知图(因果关联关系)", fontsize=16)
    plt.axis('off')

    # 保存图像
    if save_dir:
        os.makedirs(save_dir, exist_ok=True)
        save_path = os.path.join(save_dir, "fcm_causal_graph.png")
        plt.tight_layout()
        plt.savefig(save_path, dpi=600, bbox_inches="tight")
        print(f"模糊认知图已保存至: {save_path}")

    plt.close()


def get_top_causal_relationships(fcm, top_n=10, threshold=0.1):
    """提取并返回前N强因果关系(与CPU版本相同)"""
    weights = fcm.weights.cpu().numpy()
    relationships = []

    for i in range(fcm.num_concepts):
        for j in range(fcm.num_concepts):
            weight = weights[i, j]
            if i != j and abs(weight) > threshold:
                relationships.append({
                    "源节点": fcm.concept_names[i],
                    "目标节点": fcm.concept_names[j],
                    "因果强度": abs(weight),
                    "关联类型": "正相关" if weight > 0 else "负相关",
                    "权重值": weight
                })

    # 按因果强度排序
    relationships.sort(key=lambda x: x["因果强度"], reverse=True)

    # 添加排名
    for idx, rel in enumerate(relationships):
        rel["排名"] = idx + 1

    return relationships[:top_n], relationships


def main():
    start_time = time.time()

    # 路径设置
    input_dir = "./intermediate_results"
    output_dir = "./analysis_results/gpu_fcm_causal_relationship"
    os.makedirs(output_dir, exist_ok=True)

    # 加载预处理的EEG数据
    try:
        dataset = np.load(os.path.join(input_dir, "preprocessed_eeg.npy"))
        print(f"数据加载成功,形状: {dataset.shape} (样本数×时间步长×电极数)")
    except FileNotFoundError:
        print(f"错误:未找到预处理数据,请检查路径 {input_dir}")
        return

    # 电极名称(概念节点名称)
    electrodes = [
        "Fp1(左额极)", "Fp2(右额极)", "F7(左前颞)", "F3(左额)", "Fz(额中)",
        "F4(右额)", "F8(右前颞)", "T3(左中颞)", "C3(左中央)", "Cz(中央中)",
        "C4(右中央)", "T4(右中颞)", "T5(左后颞)", "P3(左顶)", "Pz(顶中)",
        "P4(右顶)", "T6(右后颞)", "O1(左枕)", "O2(右枕)"
    ]

    # 构建模糊认知图(GPU加速)
    fcm, weights = build_fcm_from_eeg(
        dataset,
        concept_names=electrodes,
        max_iter=100,  # 迭代次数
        learning_rate=0.06  # 学习率
    )

    # 保存FCM权重矩阵
    np.save(os.path.join(output_dir, "fcm_weights.npy"), weights)
    print(f"模糊认知图权重矩阵已保存至: {os.path.join(output_dir, 'fcm_weights.npy')}")

    # 可视化模糊认知图
    visualize_fcm(fcm, threshold=0.1, save_dir=output_dir)

    # 获取并保存前10强因果关系
    top_relationships, all_relationships = get_top_causal_relationships(fcm, top_n=10, threshold=0.1)

    # 保存所有因果关系到CSV
    all_df = pd.DataFrame(all_relationships)
    all_df.to_csv(os.path.join(output_dir, "all_causal_relationships.csv"), index=False, encoding="utf-8-sig")

    # 保存前10强因果关系到Excel
    top_df = pd.DataFrame(top_relationships)
    top_df.to_excel(os.path.join(output_dir, "top_10_causal_relationships.xlsx"), index=False)

    # 打印前10强因果关系
    print("\n" + "=" * 80)
    print(f"前10强因果关系(GPU加速模糊认知图分析结果)")
    print("=" * 80)
    print(f"{'排名':<6} {'源节点':<12} {'目标节点':<12} {'关联类型':<8} {'因果强度':<10} {'权重值':<10}")
    print("-" * 80)
    for rel in top_relationships:
        print(f"{rel['排名']:<6} {rel['源节点']:<12} {rel['目标节点']:<12} {rel['关联类型']:<8} "
              f"{rel['因果强度']:.4f}      {rel['权重值']:.4f}")
    print("=" * 80 + "\n")

    # 总耗时
    total_time = time.time() - start_time
    print(f"分析完成,总耗时: {total_time:.2f}秒 ({total_time / 60:.1f}分钟)")


if __name__ == "__main__":
    # 确保CUDA初始化(如果可用)
    if torch.cuda.is_available():
        torch.cuda.init()
    main()

  • 写回答

4条回答 默认 最新

  • 檀越@新空间 2025-09-15 15:59
    关注

    下午好🌅🌅🌅
    本答案参考通义千问

    在您的代码中,您尝试使用模糊认知图(Fuzzy Cognitive Map, FCM)对实验数据进行分析,并期望通过FCM找出节点之间的因果关系。然而,在输出的图片中发现只有节点,没有边,这表明权重矩阵(即因果关系)未被正确构建或可视化


    一、问题分析

    1. 权重矩阵未被正确学习

    从代码逻辑来看,learn_from_data 方法中虽然进行了权重的学习,但可能存在以下问题:

    • 梯度更新方式不正确:当前的 weights_grad 是逐样本计算并累加,但由于 outer_product 的计算方式和 learning_rate 的应用可能存在问题。
    • 权重矩阵初始化为零:初始权重全部为0,可能导致模型无法学习到任何有效的因果关系。
    • 没有正则化机制:导致权重可能无法收敛,最终仍为零。

    2. 可视化部分未正确绘制边

    在将结果可视化时,如果未正确使用邻接矩阵中的权重值来绘制边,则即使有权重,也可能只显示节点。


    二、解决方案

    1. 修正权重学习过程

    修改后的 learn_from_data 方法

    def learn_from_data(self, data, max_iter=100, learning_rate=0.01, alpha=0.5):
        """
        从数据中学习FCM权重(GPU加速版本)
        data: 输入数据,形状为(样本数, 时间步长, 概念数)
        max_iter: 最大迭代次数
        learning_rate: 学习率
        alpha: 激活函数参数
        """
        # 将数据转换为GPU张量
        data_tensor = torch.tensor(data, device=self.device, dtype=torch.float32)
        n_samples, n_time, n_concepts = data_tensor.shape
        assert n_concepts == self.num_concepts, "数据维度与FCM概念数不匹配"
    
        # 数据归一化到[0, 1](在CPU上预处理后转移到GPU)
        scaler = MinMaxScaler()
        flattened_data = data.reshape(-1, n_concepts)
        normalized_data = scaler.fit_transform(flattened_data).reshape(n_samples, n_time, n_concepts)
        normalized_tensor = torch.tensor(normalized_data, device=self.device, dtype=torch.float32)
    
        # 初始化权重(随机初始化,避免全零)
        self.weights = torch.rand((n_concepts, n_concepts), device=self.device, dtype=torch.float32) * 2 - 1  # [-1, 1]
    
        # 迭代学习权重
        for iter in range(max_iter):
            total_error = 0.0
    
            # 批量处理样本(利用GPU并行性)
            for s in range(n_samples):
                sample = normalized_tensor[s]
    
                # 初始化状态
                self.states = sample[0].clone()
    
                # 时间序列上的因果学习
                for t in range(1, n_time):
                    prev_states = self.states.clone()
                    target_states = sample[t]
    
                    # 计算预测状态(矩阵乘法在GPU上并行执行)
                    predicted_states = self.sigmoid_activation(
                        torch.matmul(prev_states, self.weights), alpha
                    )
    
                    # 计算误差
                    error = target_states - predicted_states
                    total_error += torch.mean(torch.abs(error)).item()
    
                    # 计算权重梯度
                    grad = learning_rate * torch.outer(prev_states, error)
                    self.weights -= grad  # 更新权重
    
                    # 限制权重范围
                    self.weights = torch.clamp(self.weights, -1, 1)
    
            # 打印学习进度
            if (iter + 1) % 10 == 0:
                avg_error = total_error / (n_samples * (n_time - 1))
                print(f"迭代 {iter + 1}/{max_iter}, 平均误差: {avg_error:.6f}")
    
        return self.weights.cpu().numpy()  # 返回CPU上的numpy数组
    

    重点说明:

    • 权重初始化为随机值,而不是全零,以打破对称性。
    • 直接更新权重,不再使用梯度累加器。
    • 限制权重范围 在每次更新后,防止数值发散。

    2. 修正可视化部分

    可视化方法示例

    def visualize_fcm(weights, concept_names, threshold=0.1):
        """
        可视化模糊认知图
        weights: 权重矩阵 (n_concepts x n_concepts)
        concept_names: 节点名称列表
        threshold: 边权重阈值,低于此值的边不显示
        """
        G = nx.DiGraph()
    
        # 添加节点
        for i, name in enumerate(concept_names):
            G.add_node(i, label=name)
    
        # 添加边
        for i in range(weights.shape[0]):
            for j in range(weights.shape[1]):
                weight = weights[i, j]
                if abs(weight) > threshold:
                    G.add_edge(i, j, weight=weight)
    
        # 绘制图形
        pos = nx.spring_layout(G, seed=42)
        plt.figure(figsize=(10, 8))
    
        # 绘制节点
        nx.draw_networkx_nodes(G, pos, node_size=800, node_color='lightblue')
    
        # 绘制边
        edge_labels = {(u, v): f"{w:.2f}" for u, v, w in G.edges.data('weight')}
        nx.draw_networkx_edges(G, pos, width=2, alpha=0.7, edge_color='gray')
        nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=10)
    
        # 绘制标签
        nx.draw_networkx_labels(G, pos, labels={i: name for i, name in enumerate(concept_names)}, font_size=10)
    
        plt.title("模糊认知图 (FCM) 可视化")
        plt.axis("off")
        plt.show()
    

    重点说明:

    • 使用 nx.DiGraph() 创建有向图。
    • 设置权重阈值,避免过多边影响可读性。
    • 添加边标签,显示权重值。

    3. 示例调用

    # 假设你有 EEG 数据(样本数 x 时间步 x 通道数)
    # dataset = np.random.rand(100, 10, 4)  # 示例数据
    # concept_names = ['Channel1', 'Channel2', 'Channel3', 'Channel4']
    
    # 构建FCM
    fcm = GPUFuzzyCognitiveMap(len(concept_names), concept_names)
    weights = fcm.learn_from_data(dataset)
    
    # 可视化
    visualize_fcm(weights, concept_names)
    

    三、总结

    | 步骤 | 内容 | |------|------| | 1 | 初始化权重为随机值,避免全零 | | 2 | 直接更新权重,不再使用梯度累加器 | | 3 | 限制权重范围,防止数值发散 | | 4 | 设置边权重阈值,避免过多边 | | 5 | 可视化时使用 networkxDiGraphedge_labels,确保边被正确绘制 |

    建议: 如果仍然无法看到边,请检查 weights 是否包含非零值,可以打印 weights 的前几行查看是否有效。

    如果您提供具体的实验数据或报错信息,我可以进一步帮助您调试。

    评论

报告相同问题?

问题事件

  • 创建了问题 9月15日