2201_75335496 2024-07-24 17:21 采纳率: 76.5%
浏览 6

python报错runtimeerror:dictionary changed size during iteration

这是最近写的库的一部分内容:

MoldelName = "tableOut"

import os,sys
import io
import re
import time
import random
#from colorama import Fore,Back,Style,coloramaInit

outOverwritten = False # 输出时是否覆盖输出,如否当出现覆盖时抛出异常
block_interval = 20 # 每格的横轴宽度(字符)
out_sep_time   = 0  # 每次刷新屏幕的时间间隔
autoFlush = True

"""flushType类型:
real 实时:
    实时刷新所有屏幕内容,会清空屏幕
    适合用于导出输出后查看
segm 分段:
    分段输出内容,较快且易于观察实时输出
    适合实时查看输出的情况
"""
flushType = "real"
flush_typeReal_pringList = []

stdout = {}

class FlushTypeError(Exception):
    ErrText = "\"flushType\" %s is not different."

class NoInitError(Exception):
    ErrText = f"No initialization has been performed, please call the \"init\" function to initialize {MoldelName}!"
    is_init = False

class InternalError(Exception):
    ErrText_stdout = "\tError in variable \"stdout\", unknown type \"%s\".\n\t\tPerhaps due to incorrect type input when customizing the \"stdout\" variable."

class OutputRepetition(Exception):
    ErrText = "Output overwritten, please increase timestamp accuracy or specify the 'outOverwritten' variable as True."

class Order:
    def __init__(self):
        """
        序号类型:
            stamp      时间戳:
             ┣time.time时间戳
             ┗如为这个类型,self.accuracy即为时间戳精度
            num        数字序列:
             ┣从self.num_start开始
             ┗如为这个类型,self.num_start即为序列起始序号
            rand       随机数:
             ┣random.randint随机数
             ┣如为这个类型,self.rand_range即为随机范围,
             ┗do_rand_no_repet即可使随机数不重复(见下文)
            randForNum 随机数:
             ┣同rand类型
             ┣如为这个类型,在rand类型的基础上(do_rand_no_repet不起效),
             ┗会在被重复使用的数字后添加序号
        """
        self.type = "stamp" # 序号类型0
        self.accuracy = 1 # 正整数代表小数点后位数,负数代表小数点前位数
        self.num_start = 1 # 序列起始序号
        self.rand_range = (1000,9999) # 随机范围
        """
        is_rand_down_do操作类型:
            raise    抛出错误
            whileNum 重新使用所有随机数并在后面添加序号
            while    重新使用所有随机数但不在后面添加序号
        """
        self.is_rand_down_do = "raise" # 当随机数用完时进行的操作
        self.do_rand_no_repet = True # 如为True可使随机数不重复,但当所有数字即将用完时会产生延时,用完后根据self.is_rand_down_do确定操作
        
        self.num_list = self.num_start
        self.rand_usenumlist = []
        self.rand_usedownnumdir = {}
        self.randForNum_usenumdir = {}
        
    def getOrder(self):
        if self.type == "stamp":
            return round(time.time(),self.accuracy)
        elif self.type == "num":
            self.num_list += 1
            return self.num_list - 1
        elif self.type == "rand":
            r = random.randint(*self.rand_range)
            if not self.do_rand_no_repet:
                return r
            else:
                while (r in self.rand_usenumlist):
                    if len(self.rand_usenumlist) >= len(list(range(*self.randrange))) + 1:
                        self.doRandDown()
                    r = random.randint(*self.rand_range)
                self.rand_usenumlist.append(r)
                if len(self.rand_usedownnumdir):
                    self.rand_usedownnumdir[r] += 1
                    r = f"{r} ({self.rand_usedownnumdir[r]})"
                return r
        elif self.type == "randForNum":
            r = random.randint(*self.rand_range)
            if r in self.randForNum_usenumdir:
                self.randForNum_usenumdir[r] += 1
            else:
                self.randForNum_usenumdir[r] = 1
            return f"{r} ({self.randForNum_usenumdir[r]})"
        else:
            return "No Orders"
    
    def doRandDown(self):
        if self.is_rand_down_do == "raise":
            class RandNumUsageError(Exception):ErrText = "Random numbers used up!"
            raise RandNumUsageError(RandNumUsageError.ErrText)
        elif self.is_rand_down_do == "whileNum":
            print("Wait for rand-whileNum Mod doing...",end = "")
            for i in self.randrange:
                self.rand_usedownnumdir[i] = 1
            print("\r",end = "")
        elif self.is_rand_down_do == "while":
            self.rand_usenumlist = []

order = Order()

class Std_:
    def __init__(self,type):
        self.type = type
    def write(self,text,):
        if NoInitError.is_init == False:
            raise NoInitError(NoInitError.ErrText)
        timestamp = order.getOrder()
        try:
            stdout[timestamp].append({"type":self.type,"text":text})
        except:
            stdout[timestamp] = [{"type":self.type,"text":text}]
        if autoFlush:
            flush()
class Stdinfo(Std_):pass
class Stderr(Std_):pass
class Stdwarn(Std_):pass
class Stddat(Std_):pass

stdinfo = Stdinfo("info")
stdwarn = Stdwarn("warn")
stderr  = Stderr("err")
stddat  = Stddat("dat")

class Stderr_take(io.TextIOBase):
    def __init__(self):
        self.content = ""
    def write(self,text):
        stderr.write(text)
stderr_take = Stderr_take()

#sys.stdout = Stdout()

class DataStat:
    def __init__(self):
        self.init_time = 0
        self.flush_average_time = 0
ds = DataStat()
def quit(info = True,tr = True):
    global os,sys,time,io,re,random,stdout
    if flushType == "segm":
        print("\r" + end)
    if info:
        print("╔" + ("═" * (interval + 4) * len(showlist)) + "╗\n║Info:" + (" " * ((interval + 4) * len(showlist) - 5)) + f"║\n║\tinit_time:{ds.init_time}" + (" " * ((interval + 4) * len(showlist) - 17 - len(str(ds.init_time)))) + f"║\n║\tflush_average_time:{ds.flush_average_time}" + (" " * ((interval + 4) * len(showlist) - 26 - len(str(ds.flush_average_time)))) + "║")
        print("╚" + ("═" * (interval + 4) * len(showlist)) + "╝\nWait for quit...")
    del os,sys,time,io,re,random,stdout
    if tr:
        return ds

def cls():
    os.system('cls' if os.name == 'nt' else 'clear')
    if out_sep_time:
        time.sleep(out_sep_time)

def cutstring(t,l):
    t = t.replace("\n","\\n").replace("\t","    ").replace("\r","\\r")
    tl = [t[i:i + l] for i in range(0,len(t),l)]
    if len(tl[-1]) < l:
        tl[-1] = tl[-1] + " " * (l - len(tl[-1]))
    return tl

def gettextblock(k,v):
    interval = block_interval
    text = [0 for _ in range(5)]
    if len(v) > len(showlist) and not outOverwritten:
        raise OutputRepetition(OutputRepetition.ErrText)
    for i in v:
        if "order" in showlist:
            text[0] = str(k) + " " * (interval - len(str(k)) + 3)
        if "info" in showlist and i["type"] == "info":
            text[1] = cutstring(i["text"],interval + 3)
        if "warn" in showlist and i["type"] == "warn":
            text[2] = cutstring(i["text"],interval + 3)
        if "err" in showlist and i["type"] == "err":
            text[3] = cutstring(i["text"],interval + 3)
        if "dat" in showlist and i["type"] == "dat":
            text[4] = cutstring(i["text"],interval + 3)
        if i["type"] not in showlist:
            raise InternalError(InternalError.ErrText_stdout % i["type"])
    #print(text)
    
    """
    textmaxline = max(
                   (len(text[1]) if type(text[1]) == list else 0),
                   (len(text[2]) if type(text[2]) == list else 0),
                   (len(text[3]) if type(text[3]) == list else 0),
                   (len(text[4]) if type(text[4]) == list else 0)
                  )
    """
    #优化后的textmaxline算法
    textmaxline = max((len(i) if type(i) == list else 0) for i in text[1:])
    #修复bug,使没有输出的格中填充空格
    text = [([" " * (interval + 3) for _ in range(textmaxline)] if i == 0 else i) for i in text]
    #print(textmaxline)
    blocktext = []
    #return
    for i in range(textmaxline):
        blocktext_il = [text[0]] if not i else [" " * (interval + 3) for _ in range(textmaxline)]
        for o in range(1,5):
            #if type(text[o]) == list:
                if len(text[o]) > i:
                    blocktext_il.append(text[o][i])
        # 修复第二行后前面多出一列的bug(对timestamp及每行有多个显示内容时基本无效)
        if len(blocktext_il) >= len(showlist):
            blocktext_il = blocktext_il[len(blocktext_il) - len(showlist):]
        blocktext.append(blocktext_il)
        del blocktext_il# 释放资源
    
    timestamp_recut = cutstring(blocktext[0][0],interval + 4)
    for i in range(len(timestamp_recut)):
        if len(blocktext) >= i:
            blocktext[i][0] = timestamp_recut[i]
    
    if len(blocktext) > len(timestamp_recut):
        for i in range(len(timestamp_recut),len(blocktext)):
            blocktext[i][0] = " " * (interval + 4)
    #print(blocktext)
    #return
    del text,timestamp_recut # 释放资源
    
    #print("blocktext:")
    #print(blocktext)
    #return
    text = "\n".join(["┃" + "│".join(i) + "┃" for i in blocktext])
    #print("text:")
    #print(text)
    return text

def flush(segmType_k = None):
    stdoutl = stdout
    if NoInitError.is_init == False:
        raise NoInitError(NoInitError.ErrText)
    flush_start = time.time()
    if flushType == "real":
        text = []
        for k,v in stdoutl.items():
            text.append(gettextblock(k,v))
        cls()
        print("\n".join([head,*("".join([i,("\n" if text.index(i) != len(text) - 1 else ""),(sep if text.index(i) != len(text) - 1 else "")]) for i in text),end]))
    elif flushType == "segm":
        if "title" not in flush_typeReal_pringList:
            cls()
            print(head,end = "")
            flush_typeReal_pringList.append("title")
        else:
            if segmType_k:
                print("\n" + gettextblock(segmType_k,stdoutl[segmType_k]) + "\n" + sep,end = "")
                flush_typeReal_pringList.append(segmType_k)
            for k,v in stdoutl.items():
                if k not in flush_typeReal_pringList:
                    print("\n" + gettextblock(k,v) + "\n" + sep,end = "")
                    flush_typeReal_pringList.append(k)
    else:
        raise FlushTypeError(FlushTypeError.ErrText % flushType)
    if ds.flush_average_time:
        ds.flush_average_time = (ds.flush_average_time + time.time() - flush_start) / 2
    else:
        ds.flush_average_time = time.time() - flush_start
"""
当datwrite之前的没有输出,dat会输出到前面而不是最后一列
"""


def init(timeshow = True,info = True,err = True,warn = True,dat = True,takeStderr = True):
    global end,head,sep,showlist,interval
    if NoInitError.is_init:return
    
    if takeStderr:
        global Sys_stderr
        Sys_stderr = sys.stderr
        sys.stderr = stderr_take
    
    init_start = time.time()
    interval = block_interval
    fit = "┏"
    title = ""
    sep = "┣"
    end = "┗"
    showlist = []
    titletextlen = []
    titlelen = 0
    if timeshow:
        titlelen += 1
        titletextlen.append(5)
        showlist.append("order")
        title += "order" + " " * (interval - 1)
    if info:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("info")
        title += "│info" + " " * (interval - 1)
    if warn:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("warn")
        title += "│warn" + " " * (interval - 1)
    if err:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("err")
        title += "│err " + " " * (interval - 1)
    if dat:
        titlelen += 1
        titletextlen.append(3)
        showlist.append("dat")
        title += "│dat " + " " * (interval - 1)
    
    for i in range(titlelen):
        fit += "━" * (interval - 1 + titletextlen[i]) + ("┯" if titlelen - 1 != i else "━")
    fit += "┓"
    
    for i in range(titlelen):
        sep += "━" * (interval - 1 + titletextlen[i]) + ("┿" if titlelen - 1 != i else "━")
    sep += "┫"
    
    for i in range(titlelen):
        end += "━" * (interval - 1 + titletextlen[i]) + ("┷" if titlelen - 1 != i else "━")
    end += "┛"
    
    head = "".join([fit,"\n┃",title,"┃\n",sep])
    #coloramaInit()
    cls()
    print(head + "\n" + end,end = "\n")
    
    NoInitError.is_init = True
    ds.init_time = time.time() - init_start

"""
# 测试用
stdout = {"1":[
               {"type":"info","text":"1infotext..........." * 2},
               {"type":"warn","text":"1warntext..........." * 2},
               {"type":"err","text":"1errtext............" * 2}
              ],
          "2":[
               {"type":"info","text":"2infotext..........." * 2},
               {"type":"warn","text":"2warntext..........." * 2},
               {"type":"dat","text":"2dattext............" * 2}
              ]
         }
"""
#block_interval = 11
"""
stdout[3] = [{"type":"info","text":"adtjkvd" * 3},
             {"type":"dat","text":"adtjkvd" * 3}]
"""
if __name__ == "__main__":
    init()
    order.accuracy = 6
    stderr.write("nihao" * 2)
    stdinfo.write("nihao " * 5)
    flush()
"""
win = tk.Tk()
win.title("Python_Std")

win.mainloop()"""

这是调用时的文件:

import tableOut as psr,random as r
from threading import Thread as td
psr.order.accuracy = 4
psr.order.type = "num"
psr.order.rand_range = (10,10)
psr.block_interval = 20
#psr.outOverwritten = True
#psr.autoFlush = False
psr.out_sep_time = 0.001
psr.flushType = "segm"
psr.init(takeStderr = 1)
def getstr():
    t = ""
    for i in range(r.randint(10,29)):
        t += chr(r.randint(65,122))
    return t
raisevar = 0
def raiseErr():
    global raisevar
    raisevar += 1
    t = 1 / 0
    print(t)
for i in range(30):
    # 17.5s
    psr.stdinfo.write("infowrite " + str(i) + getstr())
    #psr.stdwarn.write("warnwrite " + str(i) + getstr())
    #psr.stddat.write("datwrite " + str(i) + getstr())
    psr.stderr.write("errwrite" + str(i) + getstr())
    #psr.flush()
    if i == 1:
        ttd = td(target = raiseErr)
        ttd.start()
    """
    print("infowrite " + str(i) + getstr())
    print("warnwrite " + str(i) + getstr())
    print("datwrite " + str(i) + getstr())
    print("errwrite" + str(i) + getstr())"""

psr.quit()
print(raisevar)

该调用的文件是想测试将sys.stderr重定向到自定义类并写入stdout字典,可是报错runtimeerror:dictionary changed size during iteration,请问如何解决该问题?(我尝试在flush函数中使用临时字典可是还是报错,由于重定向到自定义类了,但是该类报错导致错误在程序最后才输出了部分内容,没有报错行数及位置,虽然已知flush函数没有问题,所以不知道哪里有问题)

  • 写回答

2条回答 默认 最新

  • 阿里嘎多学长 2024-07-24 17:21
    关注

    以下内容由AIGC及阿里嘎多学长共同生成、有用望采纳:


    在你提供的代码中,错误 RuntimeError: dictionary changed size during iteration 通常发生在遍历字典的同时,字典的大小发生了改变。在你的 flush 函数中,你正在遍历 stdout 字典,并且可能在遍历过程中修改了它,这会导致迭代器失效。

    以下是一些可能的解决方案:

    1. 使用字典的副本进行迭代: 在 flush 函数中,你可以先创建 stdout 字典的一个副本,然后在该副本上进行迭代,这样即使在迭代过程中 stdout 字典的大小发生了变化,也不会影响迭代器。

    2. 收集要删除或添加的键值对: 如果你需要在迭代过程中修改字典,可以先收集所有需要添加或删除的键值对,然后在迭代结束后应用这些更改。

    3. 使用线程安全的方法: 如果你的代码在多线程环境下运行,确保对 stdout 字典的所有访问都是线程安全的。这可能意味着使用锁或其他同步机制来防止多个线程同时修改字典。

    4. 检查异常处理: 确保在 flush 函数中没有未捕获的异常,这些异常可能会导致程序流程的意外改变,从而影响字典的大小。

    5. 使用生成器: 如果你的 flush 函数逻辑允许,可以使用生成器来逐步处理每个条目,而不是一次性迭代整个字典。

    以下是修改后的 flush 函数示例,使用字典的副本进行迭代:

    def flush(segmType_k=None):
        # 使用字典的副本进行迭代
        stdoutl = dict(stdout)  # 创建字典的浅拷贝
        flush_start = time.time()
        if NoInitError.is_init == False:
            raise NoInitError(NoInitError.ErrText)
        
        # 你的其他代码逻辑...
        # 确保在迭代过程中不直接修改 stdout 字典
        for k, v in stdoutl.items():
            # 处理每个条目...
            pass
    
        # 如果需要在迭代结束后修改 stdout 字典,可以在这里进行
        # 例如,根据需要删除或添加条目
    

    请注意,这只是一个示例,你可能需要根据你的程序的具体逻辑进行调整。如果你的 flush 函数中确实需要修改 stdout 字典,那么在迭代结束后再进行这些修改是一个好习惯。此外,如果你的程序使用了多线程,请确保使用适当的同步机制来保护共享资源。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 7月24日

悬赏问题

  • ¥20 wpf datagrid单元闪烁效果失灵
  • ¥15 券商软件上市公司信息获取问题
  • ¥100 ensp启动设备蓝屏,代码clock_watchdog_timeout
  • ¥15 Android studio AVD启动不了
  • ¥15 陆空双模式无人机怎么做
  • ¥15 想咨询点问题,与算法转换,负荷预测,数字孪生有关
  • ¥15 C#中的编译平台的区别影响
  • ¥15 软件供应链安全是跟可靠性有关还是跟安全性有关?
  • ¥15 电脑蓝屏logfilessrtsrttrail问题
  • ¥20 关于wordpress建站遇到的问题!(语言-php)(相关搜索:云服务器)