斯丢彼得 2023-04-10 16:03 采纳率: 100%
浏览 135
已结题

python 对csv 文件进行批处理

Python 批量处理csv 文件问题求帮助!@
ShowMeAI@ 带脑子的程序猿

img

img

img

如图所示,在同一个文件夹下有下列csv文件,其文件名构成为:“SN_”+“测试时间_”+“_Defect_CMreport.csv”, 部分文件SN相同,测试时间不同.
打开CSV 后,每一个csv 第七列第二行信息为测试结果信息.

目标:

  1. 测试时间部分为24小时制计数,10点之前的0-9点为1位字符,10-24点为2位字符,请统一至 “00,01,02,03 等” 对齐文件名字符数.
  2. 根据SN部分,筛选重复部分,若该SN只出现过一次,复制到target 文件夹,若出现过多次,按文件名中的测试时间排序,仅将最后一次测试时间对应的SN 复制到target 文件夹.
  3. 同一SN,至多只测试5次,只少测试过一次,将测试结果根据文件名中的时间先后顺序排序,生成如图3统计,并生成 summary.csv 文件,存至target 文件.

感谢各位关注,初学统计,面对大数据量的处理,寻求高效能方法.

  • 写回答

8条回答 默认 最新

  • 独处东汉 2023-04-10 22:58
    关注

    占坑答题,先答复你的第一个小目标,每天搞一个小目标,按照统一的时间的格式来对文件名进行操作,假设source文件夹下有几个csv的文件,以下是代码片段:

    import os
    
    folder_path = '..\source'  # 文件夹路径 可以使用相对路径也可以使用绝对路径
    for filename in os.listdir(folder_path):
        name, ext = os.path.splitext(filename)
        if ext == '.csv':
            time_str = name.split('_')[2]  # 获取时间字符串
            h, m, s = time_str.split('-')  # 分割小时、分钟和秒
            new_time_str = f'{h.zfill(2)}-{m.zfill(2)}-{s.zfill(2)}'  # 构造新的时间字符串
            new_name = name.replace(time_str, new_time_str) + ext  # 构造新的文件名
            os.rename(os.path.join(folder_path, filename), os.path.join(folder_path, new_name))
    

    img

    img

    向第二个目标进发:
    假设你的python的工程文件夹下有两个文件夹,一个是source,一个target,其中target的文件夹是什么都没有的,source文件夹下的内容如下:

    img

    现在要实现你的第二需求,代码片段如下:

    import os
    import shutil
    from datetime import datetime
    
    source_folder = 'source'  # 源文件夹
    target_folder = 'target'  # 目标文件夹
    
    latest_files = {}  # 每个 SN 对应的最新文件
    
    # 实现目标1,统一格式
    for filename in os.listdir(source_folder): # 文件夹路径,使用相对路径
        name, ext = os.path.splitext(filename)
        if ext == '.csv':
            time_str = name.split('_')[2]  # 获取时间字符串
            h, m, s = time_str.split('-')  # 分割小时、分钟和秒
            new_time_str = f'{h.zfill(2)}-{m.zfill(2)}-{s.zfill(2)}'  # 构造新的时间字符串
            new_name = name.replace(time_str, new_time_str) + ext  # 构造新的文件名
            os.rename(os.path.join(source_folder, filename), os.path.join(source_folder, new_name))
    
    # 实现目标2,文件搬运        
    # 遍历源文件夹中的所有文件
    for file_name in os.listdir(source_folder):
        sn = file_name.split('_')[0]  # 获取文件名中的 SN
        time_str = file_name.split('_')[1] + '_' + file_name.split('_')[2]  # 获取文件名中的时间字符串
        time = datetime.strptime(time_str, '%Y%m%d_%H-%M-%S')  # 将时间字符串转换为 datetime 对象
        if sn not in latest_files or time > latest_files[sn][1]:  # 如果当前 SN 还没有对应的最新文件,或者当前文件的时间比已知的最新时间更晚
            latest_files[sn] = (file_name, time)  # 更新最新文件和最新时间
    
    # 遍历每个 SN 对应的最新文件
    for sn, (latest_file, latest_time) in latest_files.items():
        shutil.copy(f'{source_folder}/{latest_file}', f'{target_folder}/{latest_file}')  # 拷贝最新文件到目标文件夹中
    

    运行之后:

    img

    img

    第三天来实现需求3,全部代码如下:

    import os
    import shutil
    import csv
    
    from datetime import datetime
    from collections import defaultdict
    
    source_folder = 'source'  # 源文件夹
    target_folder = 'target'  # 目标文件夹
     
    latest_files = {}  # 每个 SN 对应的最新文件
     
    # 实现目标1,统一格式
    for filename in os.listdir(source_folder): # 文件夹路径,使用相对路径
        name, ext = os.path.splitext(filename)
        if ext == '.csv':
            time_str = name.split('_')[2]  # 获取时间字符串
            h, m, s = time_str.split('-')  # 分割小时、分钟和秒
            new_time_str = f'{h.zfill(2)}-{m.zfill(2)}-{s.zfill(2)}'  # 构造新的时间字符串
            new_name = name.replace(time_str, new_time_str) + ext  # 构造新的文件名
            os.rename(os.path.join(source_folder, filename), os.path.join(source_folder, new_name))
     
    # 实现目标2,文件搬运        
    # 遍历源文件夹中的所有文件
    for file_name in os.listdir(source_folder):
        sn = file_name.split('_')[0]  # 获取文件名中的 SN
        time_str = file_name.split('_')[1] + '_' + file_name.split('_')[2]  # 获取文件名中的时间字符串
        time = datetime.strptime(time_str, '%Y%m%d_%H-%M-%S')  # 将时间字符串转换为 datetime 对象
        if sn not in latest_files or time > latest_files[sn][1]:  # 如果当前 SN 还没有对应的最新文件,或者当前文件的时间比已知的最新时间更晚
            latest_files[sn] = (file_name, time)  # 更新最新文件和最新时间
     
    # 遍历每个 SN 对应的最新文件
    for sn, (latest_file, latest_time) in latest_files.items():
        shutil.copy(f'{source_folder}/{latest_file}', f'{target_folder}/{latest_file}')  # 拷贝最新文件到目标文件夹中
        
    # 实现目标3,实现文件内容摘取  
    # 创建一个字典来存储每个SerialNumber的Panel_Grade值
    serialnumber_panelgrade = defaultdict(list)
    
    # 获取文件夹中的所有文件名
    filenames = os.listdir(target_folder)
    
    # 遍历文件夹中的每个csv文件
    for filename in filenames:
        if filename.endswith('.csv'):
            with open(os.path.join(target_folder, filename), newline='') as csvfile:
                reader = csv.DictReader(csvfile)
                for row in reader:
                    serialnumber = row['SerialNumber']
                    panel_grade = row['Panel_Grade']
                    if panel_grade:
                        serialnumber_panelgrade[serialnumber].append(panel_grade)
    
    # 创建一个新的csv文件
    with open('./target/summary.csv', 'w', newline='') as csvfile:
        fieldnames = ['SerialNumber', 'Panel_Grade_test1', 'Panel_Grade_test2', 'Panel_Grade_test3', 'Panel_Grade_test4']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for serialnumber, panel_grades in serialnumber_panelgrade.items():
            row = {'SerialNumber': serialnumber}
            for i in range(1, 5):
                if i <= len(panel_grades):
                    row[f'Panel_Grade_test{i}'] = panel_grades[i-1]
                else:
                    row[f'Panel_Grade_test{i}'] = 0
            writer.writerow(row)
    

    img


    target文件夹:

    img

    运行结果:

    img

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
  • 创意程序员 2023-04-10 16:14
    关注

    只能用Python吗?用Shell可能更简单

    评论
  • 萌萌哒蕗甴噐 2023-04-10 16:15
    关注

    要使用Python对CSV文件进行批处理,您可以使用内置的CSV模块和一些其他有用的库,例如Pandas。

    下面是使用CSV模块进行批处理的一个示例代码:

    
    ```c#
    
    import csv
    
    # 批处理的函数
    def batch_process(file_path):
        with open(file_path, 'r') as csv_file:
            reader = csv.reader(csv_file)
            # 逐行读取CSV文件并进行处理
            for row in reader:
                # 在这里添加您的处理代码
                print(row)
    
    # 调用函数并传入CSV文件路径
    batch_process('example.csv')
    
    
    使用Pandas库进行批处理的示例代码:
    ```c#
    import pandas as pd
    
    # 读取CSV文件为DataFrame对象
    df = pd.read_csv('example.csv')
    
    # 批处理的函数,应用于DataFrame对象的每一行
    def process_row(row):
        # 在这里添加您的处理代码
        print(row)
    
    # 将处理函数应用于DataFrame对象的每一行
    df.apply(process_row, axis=1)
    
    
    

    请注意,在处理大型CSV文件时,使用Pandas库可能会更有效率,因为它可以在内存中处理大量数据。

    评论
  • zuogle 2023-04-10 17:35
    关注

    还请提供一下源文件

    评论
  • 蓝白、波罗歌 2023-04-10 18:47
    关注

    以下答案由GPT-3.5大模型与博主波罗歌共同编写:
    这是一个比较复杂的任务,需要读取多个csv文件并对它们进行处理和分析。下面是一个可能的解决方案,需要借助Python中的一些库,比如pandas和os。以下是一个大概的代码框架,可以根据具体情况进行修改和完善。

    import pandas as pd
    import os
    
    source_dir = 'input/'             # 存放输入文件的文件夹
    target_dir = 'output/'            # 存放输出文件的文件夹
    summary_file = 'summary.csv'      # 存放统计数据的文件名
    
    # 列名
    columns = ['SN', '测试时间', '测试结果']
    
    # 遍历所有csv文件
    for file_name in os.listdir(source_dir):
        if file_name.endswith('.csv'):
            
            # 读取csv文件
            df = pd.read_csv(os.path.join(source_dir, file_name), header=1, names=columns)
            
            # 处理测试时间格式
            # TODO
            
            # 按SN进行分组,存储最后一次测试结果
            df = df.groupby('SN', as_index=False).agg('last')
            
            # 只保留测试结果,按测试时间排序
            df = df[['SN', '测试结果']].sort_values('测试时间')
            
            # 写入文件
            df.to_csv(os.path.join(target_dir, file_name), index=False)
            
    # 统计测试结果
    # TODO
    
    # 写入summary.csv文件
    # TODO
    

    具体步骤如下:

    1. 遍历存放输入文件的文件夹,对于每个csv文件,读取数据并保存在一个pandas DataFrame对象中;

    2. 处理测试时间格式,将10点之前的时间转换为1位字符,10点及以后的时间转换为2位字符;可以使用字符串的切片操作来实现;

    3. 按SN进行分组,保留每个SN最后一次的测试结果;可以使用pandas的groupby函数来实现;

    4. 只保留测试结果信息,并按照测试时间对数据进行排序;

    5. 将处理得到的数据写入到输出文件夹中,文件名与输入文件名相同;

    6. 统计每个测试结果的数量,并将数据写入到summary.csv文件中。可以使用pandas的groupby和agg函数来实现。

    需要注意的是,由于数据量较大,可能需要使用一些高效的处理方式,比如尽量避免使用循环,使用pandas自带的函数进行数据操作等。另外,需要根据具体情况进行调整,比如处理时间格式的方式,统计测试结果的方式等。
    如果我的回答解决了您的问题,请采纳!

    评论
  • Zyb0627 2023-04-10 20:25
    关注

    引用chatGPT作答,你可以使用 Python 的内置模块来处理 CSV 文件和文件名。下面是一些提示可以帮助你解决这个问题:

    1.使用 os 模块来遍历文件夹中的所有文件,并使用 csv 模块打开 CSV 文件。

    2.你可以使用 os.path.splitext() 函数来获取文件名和扩展名。

    3.使用 datetime 模块来处理日期和时间。你可以使用 datetime.strptime() 函数将字符串转换为日期时间对象,并使用 strftime() 函数将日期时间对象转换为指定格式的字符串。

    4.使用 Python 的字典数据类型来存储每个 SN 的测试结果信息。对于每个 SN,你可以创建一个字典对象,其中键是测试时间,值是测试结果。

    5.使用 sorted() 函数对测试时间进行排序。你可以使用 lambda 表达式指定要排序的键。

    6.你可以使用 csv 模块中的 DictWriter 类来将测试结果写入 summary.csv 文件中。

    7.最后,使用 shutil 模块来复制文件到目标文件夹中。

    以下是一些示例代码,供你参考:

    import os
    import csv
    import datetime
    import shutil
    
    # 用于存储每个 SN 的测试结果信息的字典
    results = {}
    
    # 遍历文件夹中的所有文件
    for file in os.listdir('.'):
        # 检查文件名是否符合要求
        if file.startswith('SN_') and file.endswith('_Defect_CMreport.csv'):
            # 获取文件名和扩展名
            filename, ext = os.path.splitext(file)
            # 解析测试时间
            test_time = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d%H%M%S')
            # 格式化测试时间为指定格式的字符串
            test_time_str = test_time.strftime('%H').lstrip('0') + test_time.strftime('%M')
            # 重命名文件
            new_filename = f'SN_{test_time.strftime("%Y%m%d_")}{test_time_str}_Defect_CMreport.csv'
            os.rename(file, new_filename)
            # 解析 SN
            sn = filename.split('_')[1]
            # 检查该 SN 是否已经存在于字典中
            if sn not in results:
                results[sn] = {}
            # 打开 CSV 文件并读取测试结果信息
            with open(new_filename, 'r') as f:
                reader = csv.reader(f)
                next(reader)  # 跳过第一行标题
                next(reader)  # 跳过第二行空行
                test_result = next(reader)[6]  # 读取测试结果信息
            # 存储测试结果信息到字典中
            results[sn][test_time] = test_result
    
    # 遍历每个 SN 的测试结果信息
    for sn, result in results.items():
        # 如果该 SN 只测试过一次,复制文件到目标文件夹中
        if len(result) == 1:
            filename = list(result.keys())[0]
            shutil.copy(f'SN_{sn}_{filename.strftime("%Y%m%d_%H%M%S")}_Defect_CMreport.csv', 'target')
        else:
        # 如果该 SN 测试过多次,按测试时间排序,仅将最后一次测试时间对应的 SN 复制到目标文件夹中
        sorted_result = sorted(result.items(), key=lambda x: x[0])
        filename = sorted_result[-1][0]
        shutil.copy(f'SN_{sn}_{filename.strftime("%Y%m%d_%H%M%S")}_Defect_CMreport.csv', 'target')
        # 根据文件名中的时间先后顺序排序测试结果
        sorted_result = sorted(result.items(), key=lambda x: x[0])
        # 生成统计信息
        statistics = []
        for i, (test_time, test_result) in enumerate(sorted_result):
            statistics.append({
                '序号': i+1,
                '测试时间': test_time.strftime('%Y-%m-%d %H:%M:%S'),
                '测试结果': test_result
            })
        # 将统计信息写入 summary.csv 文件中
        with open(os.path.join('target', f'{sn}_summary.csv'), 'w', newline='') as f:
            fieldnames = ['序号', '测试时间', '测试结果']
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            for row in statistics:
                writer.writerow(row)
    
    评论
  • Leodong. 2023-04-10 21:27
    关注

    该回答通过自己思路及引用到GPTᴼᴾᴱᴺᴬᴵ搜索,得到内容具体如下:
    为了实现这个目标,你需要编写一个 Python 脚本。以下是一个可能的解决方案,供参考:

    import os
    import shutil
    import csv
    from collections import defaultdict
    
    # 配置参数
    data_dir = './data'     # 数据文件夹
    target_dir = './target' # 目标文件夹
    
    # 定义辅助函数
    
    def get_sn(file_name):
        """从文件名中提取 SN"""
        return file_name.split('_')[1]
    
    def get_test_time(file_name):
        """从文件名中提取测试时间"""
        return file_name.split('_')[2][:8]
    
    def align_test_time(test_time_str):
        """将测试时间对齐"""
        hour_str = test_time_str[-2:]
        if int(hour_str) < 10:
            test_time_str = test_time_str[:-2] + '0' + hour_str
        return test_time_str
    
    # 处理数据
    
    # 存储 SN 对应的测试结果信息
    sn_to_results = defaultdict(list)
    
    # 遍历文件夹中的所有文件
    for file_name in os.listdir(data_dir):
        if file_name.endswith('.csv') and 'Defect_CMreport' in file_name:
            file_path = os.path.join(data_dir, file_name)
            with open(file_path, 'r') as f:
                reader = csv.reader(f)
                # 获取测试结果信息
                results = list(reader)[1][6]
            # 提取 SN 和测试时间
            sn = get_sn(file_name)
            test_time = get_test_time(file_name)
            # 对齐测试时间
            test_time = align_test_time(test_time)
            # 存储测试结果信息
            sn_to_results[sn].append((test_time, results))
    
    # 存储最后一次测试时间对应的 SN
    last_tested_sn = {}
    
    # 遍历 SN 和测试结果信息
    for sn, results_list in sn_to_results.items():
        if len(results_list) == 1:
            # 如果 SN 只测试过一次,直接复制到目标文件夹
            file_name = f'SN_{sn}_{results_list[0][0]}_Defect_CMreport.csv'
            source_path = os.path.join(data_dir, file_name)
            target_path = os.path.join(target_dir, file_name)
            shutil.copy(source_path, target_path)
            # 存储最后一次测试时间对应的 SN
            last_tested_sn[sn] = results_list[0][0]
        else:
            # 如果 SN 测试过多次,按测试时间排序,复制最后一次到目标文件夹
            sorted_results_list = sorted(results_list, key=lambda x: x[0])
            last_tested_time, last_tested_results = sorted_results_list[-1]
            file_name = f'SN_{sn}_{last_tested_time}_Defect_CMreport.csv'
            source_path = os.path.join(data_dir, file_name)
            target_path = os.path.join(target_dir, file_name)
            shutil.copy(source_path, target_path)
            # 存储最后一次测试时间对应的 SN
            last_tested_sn[sn] = last_tested_time
    
    # 统计测试结果信息并存储到 summary.csv 文件中
    with open(os.path.join(target_dir, 'summary.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['SN', '测试次数', '测试时间', '测试结果'])
        for sn, last_tested_time in last_tested_sn.items():
            # 取出该 SN 所有测试结果信息,按测试时间排序
            results_list = sn_to_results[sn]
            sorted_results_list = sorted(results_list, key=lambda x: x[0])
            # 统计测试次数和测试结果
            test_count = len(results_list)
            test_results = [x[1] for x in sorted_results_list]
            # 写入 summary.csv 文件
            writer.writerow([sn, test_count, last_tested_time, ' '.join(test_results)])
    

    这个脚本首先遍历数据文件夹中的所有文件,提取出 SN、测试时间和测试结果信息,并将它们存储到一个字典中。然后,根据 SN 的测试次数和最后一次测试时间,将文件复制到目标文件夹中。最后,统计测试结果信息并将它们写入 summary.csv 文件中。


    如果以上回答对您有所帮助,点击一下采纳该答案~谢谢

    评论
  • 阿里嘎多学长 2023-04-10 22:10
    关注

    以下内容部分参考ChatGPT模型:


    首先,我们需要使用Python中的csv模块来读取和处理csv文件。为了批量处理csv文件,我们可以使用Python的os模块来列出指定文件夹中所有的csv文件,并使用循环来逐个处理每个文件。

    以下是一个示例代码,可以读取指定文件夹中所有的csv文件并将它们合并成一个大的csv文件:

    import csv
    import os
    
    # 指定文件夹路径
    folder_path = '/path/to/csv/folder/'
    
    # 获取文件夹中所有csv文件的文件名
    csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
    
    # 创建一个空的列表,用于存储所有csv文件的数据
    all_data = []
    
    # 循环遍历每个csv文件,读取数据并添加到all_data列表中
    for csv_file in csv_files:
        with open(os.path.join(folder_path, csv_file), 'r', encoding='utf-8') as f:
            reader = csv.reader(f)
            next(reader)  # 跳过csv文件的第一行(标题行)
            for row in reader:
                all_data.append(row)
    
    # 将所有数据写入一个新的csv文件中
    with open('merged.csv', 'w', encoding='utf-8', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['SN', '测试时间', 'Defect_CM'])  # 写入新csv文件的标题行
        writer.writerows(all_data)
    

    需要注意的是,这个示例代码假设所有的csv文件都有相同的列名,并且第一行为标题行。如果不是这种情况,需要根据具体情况对代码进行修改。


    如果我的建议对您有帮助、请点击采纳、祝您生活愉快

    评论
查看更多回答(7条)

报告相同问题?

问题事件

  • 系统已结题 4月20日
  • 已采纳回答 4月12日
  • 创建了问题 4月10日

悬赏问题

  • ¥15 Qt 不小心删除了自带的类,该怎么办
  • ¥15 我需要在PC端 开两个抖店工作台客户端.(语言-java)
  • ¥15 有没有哪位厉害的人可以用C#可视化呀
  • ¥15 可以帮我看看代码哪里错了吗
  • ¥15 设计一个成绩管理系统
  • ¥15 PCL注册的选点等函数如何取消注册
  • ¥15 问一下各位,为什么我用蓝牙直接发送模拟输入的数据,接收端显示乱码呢,米思齐软件上usb串口显示正常的字符串呢?
  • ¥15 Python爬虫程序
  • ¥15 crypto 这种的应该怎么找flag?
  • ¥15 代码已写好,求帮我指出错误,有偿!