2201_75335496 2024-07-25 19:34 采纳率: 83.8%
浏览 2
已结题

python尝试将stderr到类时发生问题

下面代码是一个库,test是调用库用于测试的文件,运行test时,尝试测试sys.stderr写入自定义类时是否能正常输入到err输出的栏里,可是程序在引发用于测试的除零错误后程序会意外崩溃(有时还可以成功输出1行错误信息但还是会崩溃),请问如何解决?

tableOut.py

MoldelName = "tableOut"

import os,sys
import io
import re
import time
import random
import traceback
from threading import Lock

from tableOut_err import *
from tableOut_color import bg,fg,style

#from colorama import Fore,Back,Style,coloramaInit

outOverwritten = False # 输出时是否覆盖输出,如否当出现覆盖时抛出异常
block_interval = 20 # 每格的横轴宽度(字符)
out_sep_time   = 0  # 每次刷新屏幕的时间间隔
autoFlush = True

flushLock = Lock()

"""flushType类型:
real 实时:
    实时刷新所有屏幕内容,会清空屏幕
    适合用于导出输出后查看
segm 分段:
    分段输出内容,较快且易于观察实时输出
    适合实时查看输出的情况
"""
flushType = "segm"
flush_typeReal_pringList = []

stdout = {}


class Order:
    def __init__(self):
        """
        序号类型:
            stamp      时间戳:
             ┣time.time时间戳
             ┗如为这个类型,self.accuracy即为时间戳精度
            num        数字序列:
             ┣从self.num_start开始
             ┗如为这个类型,self.num_start即为序列起始序号
            rand       随机数:
             ┣random.randint随机数
             ┣如为这个类型,self.rand_range即为随机范围,
             ┗do_rand_no_repet即可使随机数不重复(见下文)
            randForNum 随机数:
             ┣同rand类型
             ┣如为这个类型,在rand类型的基础上(do_rand_no_repet不起效),
             ┗会在被重复使用的数字后添加序号
        """
        self.type = "num" # 序号类型为时间戳
        self.accuracy = 1 # 正整数代表小数点后位数,负数代表小数点前位数
        self.num_start = 1 # 序列起始序号
        self.rand_range = (1000,9999) # 随机范围
        """
        is_rand_down_do操作类型:
            raise    抛出错误
            whileNum 重新使用所有随机数并在后面添加序号
            while    重新使用所有随机数但不在后面添加序号
        """
        self.is_rand_down_do = "raise" # 当随机数用完时进行的操作
        self.do_rand_no_repet = True # 如为True可使随机数不重复,但当所有数字即将用完时会产生延时,用完后根据self.is_rand_down_do确定操作
        
        self.num_list = self.num_start
        self.rand_usenumlist = []
        self.rand_usedownnumdir = {}
        self.randForNum_usenumdir = {}
        
    def getOrder(self):
        if self.type == "stamp":
            return round(time.time(),self.accuracy)
        elif self.type == "num":
            self.num_list += 1
            return self.num_list - 1
        elif self.type == "rand":
            r = random.randint(*self.rand_range)
            if not self.do_rand_no_repet:
                return r
            else:
                while (r in self.rand_usenumlist):
                    if len(self.rand_usenumlist) >= len(list(range(*self.randrange))) + 1:
                        self.doRandDown()
                    r = random.randint(*self.rand_range)
                self.rand_usenumlist.append(r)
                if len(self.rand_usedownnumdir):
                    self.rand_usedownnumdir[r] += 1
                    r = f"{r} ({self.rand_usedownnumdir[r]})"
                return r
        elif self.type == "randForNum":
            r = random.randint(*self.rand_range)
            if r in self.randForNum_usenumdir:
                self.randForNum_usenumdir[r] += 1
            else:
                self.randForNum_usenumdir[r] = 1
            return f"{r} ({self.randForNum_usenumdir[r]})"
        else:
            return "No Orders"
    
    def doRandDown(self):
        if self.is_rand_down_do == "raise":
            raise RandNumUsageError(RandNumUsageError.ErrText)
        elif self.is_rand_down_do == "whileNum":
            print(bg.gray(style.dark("Wait for rand-whileNum Mod doing...")),end = "")
            for i in self.randrange:
                self.rand_usedownnumdir[i] = 1
            print("\r",end = "")
        elif self.is_rand_down_do == "while":
            self.rand_usenumlist = []

order = Order()

class Std_:
    def __init__(self,type):
        self.type = type
    def write(self,text,):
        if NoInitError.is_init == False:
            raise NoInitError(NoInitError.ErrText)
        timestamp = order.getOrder()
        with flushLock:
            try:
                stdout[timestamp].append({"type":self.type,"text":text})
            except:
                stdout[timestamp] = [{"type":self.type,"text":text}]
            if autoFlush:
                try:
                    flush()
                except Exception as e:
                    print(fg.red(traceback.format_exc()))
                    #stderr_take.write(traceback.format_exc())
class Stdinfo(Std_):pass
class Stderr(Std_):pass
class Stdwarn(Std_):pass
class Stddat(Std_):pass

stdinfo = Stdinfo("info")
stdwarn = Stdwarn("warn")
stderr  = Stderr("err")
stddat  = Stddat("dat")

class Stderr_take(io.TextIOBase):
    def __init__(self):
        self.content = ""
        self.is_writeErrInStderr = not False # 是否写入错误到stderr,如为True会在stderr中显示错误内容,但前提必须打开err表格栏为显示状态,且不保证其稳定性
        #with open("./err.txt","w",encoding = "utf-8") as f:
        #    f.write("")
    def write(self,text):
        #with open("./err.txt","a",encoding = "utf-8") as f:
        #    f.write(text)
        if self.is_writeErrInStderr:
            try:
                stderr.write(text)
            except:
                stdwarn.write(f"{MoldelName}StderrWarn:The Err is not write down.")
                self.content += text
stderr_take = Stderr_take()

#sys.stdout = Stdout()

class DataStat:
    def __init__(self):
        self.init_time = 0
        self.flush_average_time = 0
ds = DataStat()
def quit(info = True,tr = True,septime = 0.1):
    global os,sys,time,io,re,random,stdout
    time.sleep(septime)
    flush()
    if flushType == "segm":
        print("\r" + end)
    if info:
        print(bg.lgreen(fg.black("\n╔") + (fg.black("═" * (interval + 4) * len(showlist))) + fg.black("╗\n║") + style.bold(fg.black("Info:")) + (" " * ((interval + 4) * len(showlist) - 5)) + fg.black(f"║\n║\t") + fg.gray(style.bold("init_time:" + style.dark(f"{ds.init_time}"))) + (" " * ((interval + 4) * len(showlist) - 17 - len(str(ds.init_time)))) + fg.black(f"║\n║\t") + fg.black(style.bold("flush_average_time:" + style.dark(f"{ds.flush_average_time}"))) + (" " * ((interval + 4) * len(showlist) - 26 - len(str(ds.flush_average_time)))) + fg.black("║")))
        print(bg.lgreen(fg.black("╚") + (fg.black("═" * (interval + 4) * len(showlist))) + fg.black("╝")))
        print(bg.gray(fg.cyan("Lose Error:\n") + fg.red(stderr_take.content)) if stderr_take.content else "")
        print(style.dark(fg.yellow("Wait for quit...")))
    del os,sys,time,io,re,random,stdout
    if tr:
        return ds

def cls():
    os.system('cls' if os.name == 'nt' else 'clear')
    if out_sep_time:
        time.sleep(out_sep_time)

def cutstring(t,l):
    t = t.replace("\n","\\n").replace("\t","    ").replace("\r","\\r")
    tl = [t[i:i + l] for i in range(0,len(t),l)]
    if not tl:
        tl = [" " * (l)]
        return tl
    if len(tl[-1]) < l:
        tl[-1] = tl[-1] + " " * (l - len(tl[-1]))
    return tl

def gettextblock(k,v):
    interval = block_interval
    text = [0 for _ in range(5)]
    if len(v) > len(showlist) and not outOverwritten:
        raise OutputRepetition(OutputRepetition.ErrText)
    for i in v:
        if "order" in showlist:
            text[0] = str(k) + " " * (interval - len(str(k)) + 3)
        if "info" in showlist and i["type"] == "info":
            text[1] = cutstring(i["text"],interval + 3)
        if "warn" in showlist and i["type"] == "warn":
            text[2] = cutstring(i["text"],interval + 3)
        if "err" in showlist and i["type"] == "err":
            text[3] = cutstring(i["text"],interval + 3)
        if "dat" in showlist and i["type"] == "dat":
            text[4] = cutstring(i["text"],interval + 3)
        if i["type"] not in showlist:
            raise InternalError(InternalError.ErrText_stdout.format(i["type"]))
    #print(text)
    # 特定列设置颜色
    #text[0] = fg.blue(text[0])
    text[1] = [fg.white(i) for i in text[1]] if text[1] else text[1]
    text[2] = [fg.yellow(i) for i in text[2]] if text[2] else text[2]
    text[3] = [fg.red(i) for i in text[3]] if text[3] else text[3]
    text[4] = [fg.gray(i) for i in text[4]] if text[4] else text[4]
    """
    textmaxline = max(
                   (len(text[1]) if type(text[1]) == list else 0),
                   (len(text[2]) if type(text[2]) == list else 0),
                   (len(text[3]) if type(text[3]) == list else 0),
                   (len(text[4]) if type(text[4]) == list else 0)
                  )
    """
    #优化后的textmaxline算法
    textmaxline = max((len(i) if type(i) == list else 0) for i in text[1:])
    #修复bug,使没有输出的格中填充空格
    text = [([" " * (interval + 3) for _ in range(textmaxline)] if i == 0 else i) for i in text]
    #print(textmaxline)
    blocktext = []
    #return
    for i in range(textmaxline):
        blocktext_il = [text[0]] if not i else [" " * (interval + 3) for _ in range(textmaxline)]
        for o in range(1,5):
            #if type(text[o]) == list:
                if len(text[o]) > i:
                    blocktext_il.append(text[o][i])
        # 修复第二行后前面多出一列的bug(对timestamp及每行有多个显示内容时基本无效)
        if len(blocktext_il) >= len(showlist):
            blocktext_il = blocktext_il[len(blocktext_il) - len(showlist):]
        blocktext.append(blocktext_il)
        del blocktext_il# 释放资源
    
    timestamp_recut = cutstring(blocktext[0][0],interval + 4)
    for i in range(len(timestamp_recut)):
        if len(blocktext) >= i:
            blocktext[i][0] = timestamp_recut[i]
    
    if len(blocktext) > len(timestamp_recut):
        for i in range(len(timestamp_recut),len(blocktext)):
            blocktext[i][0] = " " * (interval + 4)
    #print(blocktext)
    #return
    del text,timestamp_recut # 释放资源
    
    #print("blocktext:")
    #print(blocktext)
    #return
    text = "\n".join(["┃" + "│".join(i) + "┃" for i in blocktext])
    #print("text:")
    #print(text)
    return text

def flush(segmType_k = None):
    global flush_typeReal_pringList
    if NoInitError.is_init == False:
        raise NoInitError(NoInitError.ErrText)
    flush_start = time.time()
    if flushType == "real":
        text = []
        for k,v in stdout.items():
            text.append(gettextblock(k,v))
        cls()
        print("\n".join([head,*("".join([i,("\n" if text.index(i) != len(text) - 1 else ""),(sep if text.index(i) != len(text) - 1 else "")]) for i in text),end]))
    elif flushType == "segm":
        if "title" not in flush_typeReal_pringList:
            cls()
            print(head,end = "")
            flush_typeReal_pringList.append("title")
        else:
            if segmType_k:
                print("\n" + gettextblock(segmType_k,stdout[segmType_k]) + "\n" + sep,end = "")
                flush_typeReal_pringList.append(segmType_k)
            for k,v in stdout.items():
                if k not in flush_typeReal_pringList:
                    print("\n" + gettextblock(k,v) + "\n" + sep,end = "")
                    flush_typeReal_pringList.append(k)
    else:
        raise FlushTypeError(FlushTypeError.ErrText.format(flushType))
    if ds.flush_average_time:
        ds.flush_average_time = (ds.flush_average_time + time.time() - flush_start) / 2
    else:
        ds.flush_average_time = time.time() - flush_start
"""
当datwrite之前的没有输出,dat会输出到前面而不是最后一列
"""


def init(timeshow = True,info = True,err = True,warn = True,dat = True,takeStderr = True):
    global end,head,sep,showlist,interval
    if NoInitError.is_init:return
    
    if takeStderr:
        global Sys_stderr
        Sys_stderr = sys.stderr
        sys.stderr = stderr_take
    
    init_start = time.time()
    interval = block_interval
    fit = "┏"
    title = ""
    sep = "┣"
    end = "┗"
    showlist = []
    titletextlen = []
    titlelen = 0
    if timeshow:
        titlelen += 1
        titletextlen.append(5)
        showlist.append("order")
        title += style.bold("order") + " " * (interval - 1)
    if info:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("info")
        title += "│" + style.bold("info") + " " * (interval - 1)
    if warn:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("warn")
        title += "│" + style.bold("warn") + " " * (interval - 1)
    if err:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("err")
        title += "│" + style.bold("err ") + " " * (interval - 1)
    if dat:
        titlelen += 1
        titletextlen.append(3)
        showlist.append("dat")
        title += "│dat " + " " * (interval - 1)
    
    for i in range(titlelen):
        fit += "━" * (interval - 1 + titletextlen[i]) + ("┯" if titlelen - 1 != i else "━")
    fit += "┓"
    
    for i in range(titlelen):
        sep += "━" * (interval - 1 + titletextlen[i]) + ("┿" if titlelen - 1 != i else "━")
    sep += "┫"
    
    for i in range(titlelen):
        end += "━" * (interval - 1 + titletextlen[i]) + ("┷" if titlelen - 1 != i else "━")
    end += "┛"
    
    head = "".join([fit,"\n┃",title,"┃\n",sep])
    #coloramaInit()
    cls()
    print(head + "\n" + end,end = "\n")
    
    NoInitError.is_init = True
    ds.init_time = time.time() - init_start

"""
# 测试用
stdout = {"1":[
               {"type":"info","text":"1infotext..........." * 2},
               {"type":"warn","text":"1warntext..........." * 2},
               {"type":"err","text":"1errtext............" * 2}
              ],
          "2":[
               {"type":"info","text":"2infotext..........." * 2},
               {"type":"warn","text":"2warntext..........." * 2},
               {"type":"dat","text":"2dattext............" * 2}
              ]
         }
"""
#block_interval = 11
"""
stdout[3] = [{"type":"info","text":"adtjkvd" * 3},
             {"type":"dat","text":"adtjkvd" * 3}]
"""
if __name__ == "__main__":
    init()
    order.accuracy = 6
    stderr.write("nihao" * 2)
    stdinfo.write("nihao " * 5)
    flush()
    quit()
"""
win = tk.Tk()
win.title("Python_Std")

win.mainloop()"""

test.py

import tableOut as psr,random as r
from threading import Thread as td
import time
"""
psr.order.accuracy = 4
psr.order.type = "num"
psr.order.rand_range = (10,10)
psr.block_interval = 15
#psr.outOverwritten = True
"""
psr.autoFlush = False
"""
psr.out_sep_time = 0.001
psr.flushType = "segm"
"""
psr.init(takeStderr = 1)
def getstr():
    t = ""
    for i in range(r.randint(10,29)):
        t += chr(r.randint(65,122))
    return t
raisevar = 0
def raiseErr():
    global raisevar
    raisevar += 1
    t = 1 / 0
    print(t)
for i in range(200):
    # 17.5s
    psr.stdinfo.write("infowrite " + str(i) + getstr())
    psr.stdwarn.write("warnwrite " + str(i) + getstr())
    psr.stddat.write("datwrite " + str(i) + getstr())
    #psr.stderr.write("errwrite" + str(i) + getstr())
    psr.flush()
    if i == 1:
        ttd = td(target = raiseErr)
        ttd.start()
    """
    print("infowrite " + str(i) + getstr())
    print("warnwrite " + str(i) + getstr())
    print("datwrite " + str(i) + getstr())
    print("errwrite" + str(i) + getstr())"""

psr.quit(septime = 0.1)
print(raisevar)
  • 写回答

5条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-07-25 19:34
    关注

    引自免费微信小程序:皆我百晓生

    从你的问题描述来看,你遇到的问题似乎是在尝试将错误输出到自定义类时引发的异常没有被正确处理。根据你提供的代码,当尝试引发除零错误时,程序会崩溃。这可能是由于在尝试写入错误到自定义类时出现了问题。让我们一步步来解决这个问题。

    首先,让我们检查你的Stderr_take类。这个类似乎是用来捕获错误并将其写入一个特定的位置(可能是某种日志文件或控制台)。如果在这个过程中发生异常,你可能没有正确处理它,导致了程序崩溃。一种可能的问题是你可能在写入错误时没有正确地处理可能的异常,如文件写入错误或其他IO异常。你可能需要在写入操作周围添加异常处理来捕获这些异常并避免程序崩溃。例如:

    class Stderr_take(io.TextIOBase):
        def __init__(self):
            self.content = ""
            self.is_writeErrInStderr = not False # 是否写入错误到stderr,如为True会在stderr中显示错误内容,但前提必须打开err表格栏为显示状态
    
        def write(self, text):
            try:
                if self.is_writeErrInStderr:
                    stderr.write(text) # 注意此处没有异常处理
                else:
                    self.content += text
            except Exception as e:
                print(f"Error writing to stderr: {e}")  # 添加异常处理,打印错误信息而不是崩溃
    

    其次,请检查你的order类和相关的写操作函数,特别是那些涉及到随机数生成的部分。如果随机数生成出现问题(例如超出范围),可能会导致程序崩溃。确保你对所有可能的异常情况进行了处理。你可能需要添加一些异常处理来捕获这些异常并给出更有用的错误信息。

    最后,你的flush函数在处理输出时可能也存在一些问题。当在输出大量数据时,特别是在使用分段刷新时,可能会出现问题。你需要确保你的刷新逻辑是正确的,并且能正确处理各种可能的异常情况。你可能需要添加一些异常处理来捕获并处理可能出现的错误。

    由于你的问题涉及到多个部分和复杂的逻辑,解决这个问题可能需要一些时间和耐心。我建议你一步一步地检查你的代码,先从最可能出错的地方开始检查,然后逐步排除问题。同时,你也可以尝试简化你的代码,以便更容易地找到问题所在。如果你有任何具体的错误消息或更多的上下文信息,那将更有助于我为你提供更具体的帮助。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(4条)

报告相同问题?

问题事件

  • 系统已结题 8月12日
  • 已采纳回答 8月4日
  • 创建了问题 7月25日

悬赏问题

  • ¥15 乌班图ip地址配置及远程SSH
  • ¥15 怎么让点阵屏显示静态爱心,用keiluVision5写出让点阵屏显示静态爱心的代码,越快越好
  • ¥15 PSPICE制作一个加法器
  • ¥15 javaweb项目无法正常跳转
  • ¥15 VMBox虚拟机无法访问
  • ¥15 skd显示找不到头文件
  • ¥15 机器视觉中图片中长度与真实长度的关系
  • ¥15 fastreport table 怎么只让每页的最下面和最顶部有横线
  • ¥15 java 的protected权限 ,问题在注释里
  • ¥15 这个是哪里有问题啊?