这是最近写的库的一部分内容:
MoldelName = "tableOut"
import os,sys
import io
import re
import time
import random
#from colorama import Fore,Back,Style,coloramaInit
outOverwritten = False # 输出时是否覆盖输出,如否当出现覆盖时抛出异常
block_interval = 20 # 每格的横轴宽度(字符)
out_sep_time = 0 # 每次刷新屏幕的时间间隔
autoFlush = True
"""flushType类型:
real 实时:
实时刷新所有屏幕内容,会清空屏幕
适合用于导出输出后查看
segm 分段:
分段输出内容,较快且易于观察实时输出
适合实时查看输出的情况
"""
flushType = "real"
flush_typeReal_pringList = []
stdout = {}
class FlushTypeError(Exception):
ErrText = "\"flushType\" %s is not different."
class NoInitError(Exception):
ErrText = f"No initialization has been performed, please call the \"init\" function to initialize {MoldelName}!"
is_init = False
class InternalError(Exception):
ErrText_stdout = "\tError in variable \"stdout\", unknown type \"%s\".\n\t\tPerhaps due to incorrect type input when customizing the \"stdout\" variable."
class OutputRepetition(Exception):
ErrText = "Output overwritten, please increase timestamp accuracy or specify the 'outOverwritten' variable as True."
class Order:
def __init__(self):
"""
序号类型:
stamp 时间戳:
┣time.time时间戳
┗如为这个类型,self.accuracy即为时间戳精度
num 数字序列:
┣从self.num_start开始
┗如为这个类型,self.num_start即为序列起始序号
rand 随机数:
┣random.randint随机数
┣如为这个类型,self.rand_range即为随机范围,
┗do_rand_no_repet即可使随机数不重复(见下文)
randForNum 随机数:
┣同rand类型
┣如为这个类型,在rand类型的基础上(do_rand_no_repet不起效),
┗会在被重复使用的数字后添加序号
"""
self.type = "stamp" # 序号类型0
self.accuracy = 1 # 正整数代表小数点后位数,负数代表小数点前位数
self.num_start = 1 # 序列起始序号
self.rand_range = (1000,9999) # 随机范围
"""
is_rand_down_do操作类型:
raise 抛出错误
whileNum 重新使用所有随机数并在后面添加序号
while 重新使用所有随机数但不在后面添加序号
"""
self.is_rand_down_do = "raise" # 当随机数用完时进行的操作
self.do_rand_no_repet = True # 如为True可使随机数不重复,但当所有数字即将用完时会产生延时,用完后根据self.is_rand_down_do确定操作
self.num_list = self.num_start
self.rand_usenumlist = []
self.rand_usedownnumdir = {}
self.randForNum_usenumdir = {}
def getOrder(self):
if self.type == "stamp":
return round(time.time(),self.accuracy)
elif self.type == "num":
self.num_list += 1
return self.num_list - 1
elif self.type == "rand":
r = random.randint(*self.rand_range)
if not self.do_rand_no_repet:
return r
else:
while (r in self.rand_usenumlist):
if len(self.rand_usenumlist) >= len(list(range(*self.randrange))) + 1:
self.doRandDown()
r = random.randint(*self.rand_range)
self.rand_usenumlist.append(r)
if len(self.rand_usedownnumdir):
self.rand_usedownnumdir[r] += 1
r = f"{r} ({self.rand_usedownnumdir[r]})"
return r
elif self.type == "randForNum":
r = random.randint(*self.rand_range)
if r in self.randForNum_usenumdir:
self.randForNum_usenumdir[r] += 1
else:
self.randForNum_usenumdir[r] = 1
return f"{r} ({self.randForNum_usenumdir[r]})"
else:
return "No Orders"
def doRandDown(self):
if self.is_rand_down_do == "raise":
class RandNumUsageError(Exception):ErrText = "Random numbers used up!"
raise RandNumUsageError(RandNumUsageError.ErrText)
elif self.is_rand_down_do == "whileNum":
print("Wait for rand-whileNum Mod doing...",end = "")
for i in self.randrange:
self.rand_usedownnumdir[i] = 1
print("\r",end = "")
elif self.is_rand_down_do == "while":
self.rand_usenumlist = []
order = Order()
class Std_:
def __init__(self,type):
self.type = type
def write(self,text,):
if NoInitError.is_init == False:
raise NoInitError(NoInitError.ErrText)
timestamp = order.getOrder()
try:
stdout[timestamp].append({"type":self.type,"text":text})
except:
stdout[timestamp] = [{"type":self.type,"text":text}]
if autoFlush:
flush()
class Stdinfo(Std_):pass
class Stderr(Std_):pass
class Stdwarn(Std_):pass
class Stddat(Std_):pass
stdinfo = Stdinfo("info")
stdwarn = Stdwarn("warn")
stderr = Stderr("err")
stddat = Stddat("dat")
class Stderr_take(io.TextIOBase):
def __init__(self):
self.content = ""
def write(self,text):
stderr.write(text)
stderr_take = Stderr_take()
#sys.stdout = Stdout()
class DataStat:
def __init__(self):
self.init_time = 0
self.flush_average_time = 0
ds = DataStat()
def quit(info = True,tr = True):
global os,sys,time,io,re,random,stdout
if flushType == "segm":
print("\r" + end)
if info:
print("╔" + ("═" * (interval + 4) * len(showlist)) + "╗\n║Info:" + (" " * ((interval + 4) * len(showlist) - 5)) + f"║\n║\tinit_time:{ds.init_time}" + (" " * ((interval + 4) * len(showlist) - 17 - len(str(ds.init_time)))) + f"║\n║\tflush_average_time:{ds.flush_average_time}" + (" " * ((interval + 4) * len(showlist) - 26 - len(str(ds.flush_average_time)))) + "║")
print("╚" + ("═" * (interval + 4) * len(showlist)) + "╝\nWait for quit...")
del os,sys,time,io,re,random,stdout
if tr:
return ds
def cls():
os.system('cls' if os.name == 'nt' else 'clear')
if out_sep_time:
time.sleep(out_sep_time)
def cutstring(t,l):
t = t.replace("\n","\\n").replace("\t"," ").replace("\r","\\r")
tl = [t[i:i + l] for i in range(0,len(t),l)]
if len(tl[-1]) < l:
tl[-1] = tl[-1] + " " * (l - len(tl[-1]))
return tl
def gettextblock(k,v):
interval = block_interval
text = [0 for _ in range(5)]
if len(v) > len(showlist) and not outOverwritten:
raise OutputRepetition(OutputRepetition.ErrText)
for i in v:
if "order" in showlist:
text[0] = str(k) + " " * (interval - len(str(k)) + 3)
if "info" in showlist and i["type"] == "info":
text[1] = cutstring(i["text"],interval + 3)
if "warn" in showlist and i["type"] == "warn":
text[2] = cutstring(i["text"],interval + 3)
if "err" in showlist and i["type"] == "err":
text[3] = cutstring(i["text"],interval + 3)
if "dat" in showlist and i["type"] == "dat":
text[4] = cutstring(i["text"],interval + 3)
if i["type"] not in showlist:
raise InternalError(InternalError.ErrText_stdout % i["type"])
#print(text)
"""
textmaxline = max(
(len(text[1]) if type(text[1]) == list else 0),
(len(text[2]) if type(text[2]) == list else 0),
(len(text[3]) if type(text[3]) == list else 0),
(len(text[4]) if type(text[4]) == list else 0)
)
"""
#优化后的textmaxline算法
textmaxline = max((len(i) if type(i) == list else 0) for i in text[1:])
#修复bug,使没有输出的格中填充空格
text = [([" " * (interval + 3) for _ in range(textmaxline)] if i == 0 else i) for i in text]
#print(textmaxline)
blocktext = []
#return
for i in range(textmaxline):
blocktext_il = [text[0]] if not i else [" " * (interval + 3) for _ in range(textmaxline)]
for o in range(1,5):
#if type(text[o]) == list:
if len(text[o]) > i:
blocktext_il.append(text[o][i])
# 修复第二行后前面多出一列的bug(对timestamp及每行有多个显示内容时基本无效)
if len(blocktext_il) >= len(showlist):
blocktext_il = blocktext_il[len(blocktext_il) - len(showlist):]
blocktext.append(blocktext_il)
del blocktext_il# 释放资源
timestamp_recut = cutstring(blocktext[0][0],interval + 4)
for i in range(len(timestamp_recut)):
if len(blocktext) >= i:
blocktext[i][0] = timestamp_recut[i]
if len(blocktext) > len(timestamp_recut):
for i in range(len(timestamp_recut),len(blocktext)):
blocktext[i][0] = " " * (interval + 4)
#print(blocktext)
#return
del text,timestamp_recut # 释放资源
#print("blocktext:")
#print(blocktext)
#return
text = "\n".join(["┃" + "│".join(i) + "┃" for i in blocktext])
#print("text:")
#print(text)
return text
def flush(segmType_k = None):
stdoutl = stdout
if NoInitError.is_init == False:
raise NoInitError(NoInitError.ErrText)
flush_start = time.time()
if flushType == "real":
text = []
for k,v in stdoutl.items():
text.append(gettextblock(k,v))
cls()
print("\n".join([head,*("".join([i,("\n" if text.index(i) != len(text) - 1 else ""),(sep if text.index(i) != len(text) - 1 else "")]) for i in text),end]))
elif flushType == "segm":
if "title" not in flush_typeReal_pringList:
cls()
print(head,end = "")
flush_typeReal_pringList.append("title")
else:
if segmType_k:
print("\n" + gettextblock(segmType_k,stdoutl[segmType_k]) + "\n" + sep,end = "")
flush_typeReal_pringList.append(segmType_k)
for k,v in stdoutl.items():
if k not in flush_typeReal_pringList:
print("\n" + gettextblock(k,v) + "\n" + sep,end = "")
flush_typeReal_pringList.append(k)
else:
raise FlushTypeError(FlushTypeError.ErrText % flushType)
if ds.flush_average_time:
ds.flush_average_time = (ds.flush_average_time + time.time() - flush_start) / 2
else:
ds.flush_average_time = time.time() - flush_start
"""
当datwrite之前的没有输出,dat会输出到前面而不是最后一列
"""
def init(timeshow = True,info = True,err = True,warn = True,dat = True,takeStderr = True):
global end,head,sep,showlist,interval
if NoInitError.is_init:return
if takeStderr:
global Sys_stderr
Sys_stderr = sys.stderr
sys.stderr = stderr_take
init_start = time.time()
interval = block_interval
fit = "┏"
title = ""
sep = "┣"
end = "┗"
showlist = []
titletextlen = []
titlelen = 0
if timeshow:
titlelen += 1
titletextlen.append(5)
showlist.append("order")
title += "order" + " " * (interval - 1)
if info:
titlelen += 1
titletextlen.append(4)
showlist.append("info")
title += "│info" + " " * (interval - 1)
if warn:
titlelen += 1
titletextlen.append(4)
showlist.append("warn")
title += "│warn" + " " * (interval - 1)
if err:
titlelen += 1
titletextlen.append(4)
showlist.append("err")
title += "│err " + " " * (interval - 1)
if dat:
titlelen += 1
titletextlen.append(3)
showlist.append("dat")
title += "│dat " + " " * (interval - 1)
for i in range(titlelen):
fit += "━" * (interval - 1 + titletextlen[i]) + ("┯" if titlelen - 1 != i else "━")
fit += "┓"
for i in range(titlelen):
sep += "━" * (interval - 1 + titletextlen[i]) + ("┿" if titlelen - 1 != i else "━")
sep += "┫"
for i in range(titlelen):
end += "━" * (interval - 1 + titletextlen[i]) + ("┷" if titlelen - 1 != i else "━")
end += "┛"
head = "".join([fit,"\n┃",title,"┃\n",sep])
#coloramaInit()
cls()
print(head + "\n" + end,end = "\n")
NoInitError.is_init = True
ds.init_time = time.time() - init_start
"""
# 测试用
stdout = {"1":[
{"type":"info","text":"1infotext..........." * 2},
{"type":"warn","text":"1warntext..........." * 2},
{"type":"err","text":"1errtext............" * 2}
],
"2":[
{"type":"info","text":"2infotext..........." * 2},
{"type":"warn","text":"2warntext..........." * 2},
{"type":"dat","text":"2dattext............" * 2}
]
}
"""
#block_interval = 11
"""
stdout[3] = [{"type":"info","text":"adtjkvd" * 3},
{"type":"dat","text":"adtjkvd" * 3}]
"""
if __name__ == "__main__":
init()
order.accuracy = 6
stderr.write("nihao" * 2)
stdinfo.write("nihao " * 5)
flush()
"""
win = tk.Tk()
win.title("Python_Std")
win.mainloop()"""
这是调用时的文件:
import tableOut as psr,random as r
from threading import Thread as td
psr.order.accuracy = 4
psr.order.type = "num"
psr.order.rand_range = (10,10)
psr.block_interval = 20
#psr.outOverwritten = True
#psr.autoFlush = False
psr.out_sep_time = 0.001
psr.flushType = "segm"
psr.init(takeStderr = 1)
def getstr():
t = ""
for i in range(r.randint(10,29)):
t += chr(r.randint(65,122))
return t
raisevar = 0
def raiseErr():
global raisevar
raisevar += 1
t = 1 / 0
print(t)
for i in range(30):
# 17.5s
psr.stdinfo.write("infowrite " + str(i) + getstr())
#psr.stdwarn.write("warnwrite " + str(i) + getstr())
#psr.stddat.write("datwrite " + str(i) + getstr())
psr.stderr.write("errwrite" + str(i) + getstr())
#psr.flush()
if i == 1:
ttd = td(target = raiseErr)
ttd.start()
"""
print("infowrite " + str(i) + getstr())
print("warnwrite " + str(i) + getstr())
print("datwrite " + str(i) + getstr())
print("errwrite" + str(i) + getstr())"""
psr.quit()
print(raisevar)
该调用的文件是想测试将sys.stderr重定向到自定义类并写入stdout字典,可是报错runtimeerror:dictionary changed size during iteration,请问如何解决该问题?(我尝试在flush函数中使用临时字典可是还是报错,由于重定向到自定义类了,但是该类报错导致错误在程序最后才输出了部分内容,没有报错行数及位置,虽然已知flush函数没有问题,所以不知道哪里有问题)