weixin_59094960 2022-07-27 14:47 采纳率: 55.6%
浏览 483
已结题

求用python匹配数据段的代码

问题遇到的现象和发生背景

我需要一个python代码,实现从数据库中匹配最相似的数据段。
我输入的数据段与匹配的数据时间长度并不一定相等,之前了解到许多是用DTW的方法来实现。许多案例是计算两个数据之间的相似度,而我需要从一堆数据库中筛选出最为匹配的数据,起点和终点都未知。也许要完成这个功能单凭DTW并不能胜任,就请帮我实现。具体请见示例图:

img

我想要达到的结果

我将一个dataframe里面某个数据输入程序,程序从我的数据库文件夹里匹配出最为相似的数据段并输出时间,只需要告诉是哪个数据文件的哪一段即可。数据库文件夹里要么全部是CSV文件,要么全部是H5,你就当时CSV吧,我可以根据情况自行修改。由于数据库里文件较多,所以希望能提升匹配速度。如果对程序中的个别代码有疑惑,希望可以与你沟通。
多写注释,不要贴图啊,我要直接ctrl C+V 运行,私信我。可以运行我就采纳了。

有兴趣的私信我,我把测试数据发给你,效果好的话我会+200

  • 写回答

8条回答 默认 最新

  • 天元浪子 Python领域优质创作者 2022-07-28 10:40
    关注

    请根据你的数据存储情况修改应用代码中的路径。如果匹配数据文件较多,建议将debug设置为False

    # -*- coding: utf-8 -*-
    
    import os, sys, time
    import numpy as np
    import pandas as pd
    from scipy import interpolate
    
    
    class CurveMatchPipe:
        """时序数据匹配流水线"""
        
        def __init__(self, sample_file, data_folder, max_var=0.1, debug=False):
            """构造函数
            
            sample_file     - 样本数据文件名
            data_folder     - 数据仓库路径
            max_var         - 偏离方差(数值越小,曲线越相似)
            """
            
            data_csv = self.read_csv(sample_file)
            if data_csv is None:
                print('样本数据文件%s缺少time列或aim列,程序终止运行。'%sample_file)
                sys.exit(1)
            
            stamp, data = self.data_cleaning(*data_csv, 'linear')
            self.sample = (data - data.mean()) / data.std()
            self.data_folder = data_folder
            self.max_var = max_var
            self.debug = debug
            self.time_cost = list()
            self.result = {
                '数据文件': list(),
                '起始时间': list(),
                '截止时间': list(),
                '起始索引': list(),
                '截止索引': list(),
                '偏离方差': list()
            }
    
        def read_csv(self, fn):
            """读取数据文件,返回时间戳数组和aim数组"""
            
            stamp, data = list(), list()
            with open(fn, 'r') as fp:
                lines = fp.readlines()
                col_names = lines[0].split(',')
                
                if 'time' in col_names:
                    idx_time = col_names.index('time')
                else:
                    return None
                
                if 'aim' in col_names:
                    idx_aim = col_names.index('aim')
                else:
                    return None
                
                for line in lines[1:]:
                    items = line.split(',')
                    stamp.append(int(items[idx_time]))
                    data.append(float(items[idx_aim]))
            
            return np.array(stamp), np.array(data)
        
        def is_continuous(self, stamp):
            """判断时间戳是否连续"""
            
            return np.where(np.diff(stamp) != 1)[0].shape[0] == 0
        
        def data_cleaning(self, stamp, data, method='linear'):
            """数据清洗。对于缺值数据默认线性插值,可选样条插值(cubic)"""
            
            if self.is_continuous(stamp):
                return stamp, data
            
            f = interpolate.interp1d(stamp, data, kind=method)
            stamp_new = np.linspace(stamp[0], stamp[-1], stamp[-1]-stamp[0]+1)
            data_new = f(stamp_new) 
            
            return np.int32(stamp_new), data_new
        
        def match(self):
            """遍历数据仓库,匹配样本数据"""
            
            for fn in os.listdir(self.data_folder):
                t0 = time.time()
                if self.debug:
                    print('正在处理文件%s...'%fn, end='')
                
                if os.path.splitext(fn)[1] != '.csv':
                    if self.debug:
                        print('忽略:文件格式错误')
                    continue
                
                data_csv = self.read_csv(os.path.join(self.data_folder, fn))
                if data_csv is None:
                    if self.debug:
                        print('忽略:缺少time列或aim列')
                    continue
                
                stamp, data = self.data_cleaning(*data_csv, 'linear')
                m, n = self.sample.shape[0], data.shape[0]
                d = np.vstack([data[i:n-m+1+i] for i in range(m)]).T
                d_mean = d.mean(axis=1).reshape(-1,1)
                d_std = d.std(axis=1).reshape(-1,1)
                d = (d - d_mean) / d_std
                diff = d - self.sample
                variance = diff.var(axis=1)
                
                for idx in np.argsort(variance):
                    if variance[idx] > self.max_var:
                        break
                    
                    self.result['数据文件'].append(fn)
                    self.result['起始时间'].append(stamp[idx])
                    self.result['截止时间'].append(stamp[idx+m])
                    self.result['起始索引'].append(idx)
                    self.result['截止索引'].append(idx+m)
                    self.result['偏离方差'].append(variance[idx])
                
                if self.debug:
                    print('完成')
                
                t1 = time.time()
                self.time_cost.append(t1-t0)
        
        def report(self, out_file=None):
            """打印DataFrame结构的匹配结果报告,若提供输出文件名,则生成excel文件"""
            
            report = pd.DataFrame(self.result)
            n = len(self.time_cost)
            total = sum(self.time_cost)
            mean = total/n
            
            if out_file:
                report.to_excel(out_file, sheet_name='匹配结果')
            else:
                print('---------------------------------------------------------------------------------')
                print(report)
            
            print('---------------------------------------------------------------------------------')
            print('共计处理%d个数据文件,累计耗时%.3f秒,单个文件平均用时%.3f秒'%(n, total, mean))
        
    
    if __name__ == '__main__':
        cmp = CurveMatchPipe('data/samples/data.csv', 'data/storehouse', max_var=0.3, debug=True)
        cmp.match()
        cmp.report('report.xlsx')
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(7条)

报告相同问题?

问题事件

  • 系统已结题 8月11日
  • 已采纳回答 8月3日
  • 修改了问题 7月28日
  • 修改了问题 7月27日
  • 展开全部

悬赏问题

  • ¥15 AT89C52单片机C语言串口助手发送数据包返回值
  • ¥15 C++数组中找第二小的数字程序纠错
  • ¥50 MATLAB APP 制作出现问题
  • ¥15 wannier复现图像时berry曲率极值点与高对称点严重偏移
  • ¥15 利用决策森林为什么会出现这样·的问题(关键词-情感分析)
  • ¥15 DispatcherServlet.noHandlerFound No mapping found for HTTP request with URI[/untitled30_war_e
  • ¥15 使用deepspeed训练,发现想要训练的参数没有梯度
  • ¥15 寻找一块做为智能割草机的驱动板(标签-stm32|关键词-m3)
  • ¥15 信息管理系统的查找和排序
  • ¥15 基于STM32,电机驱动模块为L298N,四路运放电磁传感器,三轮智能小车电磁组电磁循迹(两个电机,一个万向轮),怎么用读取的电磁传感器信号表示小车所在的位置