2201_75335496 2024-07-24 19:22 采纳率: 76.5%
浏览 0
已结题

python关于线程安全的问题

如何将这段代码中的Std_类和flush函数完全线程安全?

MoldelName = "tableOut"

import os,sys
import io
import re
import time
import random
from threading import RLock
#from colorama import Fore,Back,Style,coloramaInit

outOverwritten = False # 输出时是否覆盖输出,如否当出现覆盖时抛出异常
block_interval = 20 # 每格的横轴宽度(字符)
out_sep_time   = 0  # 每次刷新屏幕的时间间隔
autoFlush = True

flushLock = RLock()

"""flushType类型:
real 实时:
    实时刷新所有屏幕内容,会清空屏幕
    适合用于导出输出后查看
segm 分段:
    分段输出内容,较快且易于观察实时输出
    适合实时查看输出的情况
"""
flushType = "real"
flush_typeReal_pringList = []

stdout = {}

class FlushTypeError(Exception):
    ErrText = "\"flushType\" %s is not different."

class NoInitError(Exception):
    ErrText = f"No initialization has been performed, please call the \"init\" function to initialize {MoldelName}!"
    is_init = False

class InternalError(Exception):
    ErrText_stdout = "\tError in variable \"stdout\", unknown type \"%s\".\n\t\tPerhaps due to incorrect type input when customizing the \"stdout\" variable."

class OutputRepetition(Exception):
    ErrText = "Output overwritten, please increase timestamp accuracy or specify the 'outOverwritten' variable as True."

class Order:
    def __init__(self):
        """
        序号类型:
            stamp      时间戳:
             ┣time.time时间戳
             ┗如为这个类型,self.accuracy即为时间戳精度
            num        数字序列:
             ┣从self.num_start开始
             ┗如为这个类型,self.num_start即为序列起始序号
            rand       随机数:
             ┣random.randint随机数
             ┣如为这个类型,self.rand_range即为随机范围,
             ┗do_rand_no_repet即可使随机数不重复(见下文)
            randForNum 随机数:
             ┣同rand类型
             ┣如为这个类型,在rand类型的基础上(do_rand_no_repet不起效),
             ┗会在被重复使用的数字后添加序号
        """
        self.type = "stamp" # 序号类型0
        self.accuracy = 1 # 正整数代表小数点后位数,负数代表小数点前位数
        self.num_start = 1 # 序列起始序号
        self.rand_range = (1000,9999) # 随机范围
        """
        is_rand_down_do操作类型:
            raise    抛出错误
            whileNum 重新使用所有随机数并在后面添加序号
            while    重新使用所有随机数但不在后面添加序号
        """
        self.is_rand_down_do = "raise" # 当随机数用完时进行的操作
        self.do_rand_no_repet = True # 如为True可使随机数不重复,但当所有数字即将用完时会产生延时,用完后根据self.is_rand_down_do确定操作
        
        self.num_list = self.num_start
        self.rand_usenumlist = []
        self.rand_usedownnumdir = {}
        self.randForNum_usenumdir = {}
        
    def getOrder(self):
        if self.type == "stamp":
            return round(time.time(),self.accuracy)
        elif self.type == "num":
            self.num_list += 1
            return self.num_list - 1
        elif self.type == "rand":
            r = random.randint(*self.rand_range)
            if not self.do_rand_no_repet:
                return r
            else:
                while (r in self.rand_usenumlist):
                    if len(self.rand_usenumlist) >= len(list(range(*self.randrange))) + 1:
                        self.doRandDown()
                    r = random.randint(*self.rand_range)
                self.rand_usenumlist.append(r)
                if len(self.rand_usedownnumdir):
                    self.rand_usedownnumdir[r] += 1
                    r = f"{r} ({self.rand_usedownnumdir[r]})"
                return r
        elif self.type == "randForNum":
            r = random.randint(*self.rand_range)
            if r in self.randForNum_usenumdir:
                self.randForNum_usenumdir[r] += 1
            else:
                self.randForNum_usenumdir[r] = 1
            return f"{r} ({self.randForNum_usenumdir[r]})"
        else:
            return "No Orders"
    
    def doRandDown(self):
        if self.is_rand_down_do == "raise":
            class RandNumUsageError(Exception):ErrText = "Random numbers used up!"
            raise RandNumUsageError(RandNumUsageError.ErrText)
        elif self.is_rand_down_do == "whileNum":
            print("Wait for rand-whileNum Mod doing...",end = "")
            for i in self.randrange:
                self.rand_usedownnumdir[i] = 1
            print("\r",end = "")
        elif self.is_rand_down_do == "while":
            self.rand_usenumlist = []

order = Order()

class Std_:
    def __init__(self,type):
        self.type = type
    def write(self,text,):
        if NoInitError.is_init == False:
            raise NoInitError(NoInitError.ErrText)
        timestamp = order.getOrder()
        with flushLock:
            try:
                stdout[timestamp].append({"type":self.type,"text":text})
            except:
                stdout[timestamp] = [{"type":self.type,"text":text}]
            if autoFlush:
                flush()
class Stdinfo(Std_):pass
class Stderr(Std_):pass
class Stdwarn(Std_):pass
class Stddat(Std_):pass

stdinfo = Stdinfo("info")
stdwarn = Stdwarn("warn")
stderr  = Stderr("err")
stddat  = Stddat("dat")

class Stderr_take(io.TextIOBase):
    def __init__(self):
        self.content = ""
    def write(self,text):
        stderr.write(text)
        self.content += text
stderr_take = Stderr_take()

#sys.stdout = Stdout()

class DataStat:
    def __init__(self):
        self.init_time = 0
        self.flush_average_time = 0
ds = DataStat()
def quit(info = True,tr = True):
    global os,sys,time,io,re,random,stdout
    if flushType == "segm":
        print("\r" + end)
    if info:
        print("╔" + ("═" * (interval + 4) * len(showlist)) + "╗\n║Info:" + (" " * ((interval + 4) * len(showlist) - 5)) + f"║\n║\tinit_time:{ds.init_time}" + (" " * ((interval + 4) * len(showlist) - 17 - len(str(ds.init_time)))) + f"║\n║\tflush_average_time:{ds.flush_average_time}" + (" " * ((interval + 4) * len(showlist) - 26 - len(str(ds.flush_average_time)))) + "║")
        print("╚" + ("═" * (interval + 4) * len(showlist)) + "╝")
        print("Lose Error:\n" + stderr_take.content if stderr_take.content else "")
        print("Wait for quit...")
    del os,sys,time,io,re,random,stdout
    if tr:
        return ds

def cls():
    os.system('cls' if os.name == 'nt' else 'clear')
    if out_sep_time:
        time.sleep(out_sep_time)

def cutstring(t,l):
    t = t.replace("\n","\\n").replace("\t","    ").replace("\r","\\r")
    tl = [t[i:i + l] for i in range(0,len(t),l)]
    if len(tl[-1]) < l:
        tl[-1] = tl[-1] + " " * (l - len(tl[-1]))
    return tl

def gettextblock(k,v):
    interval = block_interval
    text = [0 for _ in range(5)]
    if len(v) > len(showlist) and not outOverwritten:
        raise OutputRepetition(OutputRepetition.ErrText)
    for i in v:
        if "order" in showlist:
            text[0] = str(k) + " " * (interval - len(str(k)) + 3)
        if "info" in showlist and i["type"] == "info":
            text[1] = cutstring(i["text"],interval + 3)
        if "warn" in showlist and i["type"] == "warn":
            text[2] = cutstring(i["text"],interval + 3)
        if "err" in showlist and i["type"] == "err":
            text[3] = cutstring(i["text"],interval + 3)
        if "dat" in showlist and i["type"] == "dat":
            text[4] = cutstring(i["text"],interval + 3)
        if i["type"] not in showlist:
            raise InternalError(InternalError.ErrText_stdout % i["type"])
    #print(text)
    
    """
    textmaxline = max(
                   (len(text[1]) if type(text[1]) == list else 0),
                   (len(text[2]) if type(text[2]) == list else 0),
                   (len(text[3]) if type(text[3]) == list else 0),
                   (len(text[4]) if type(text[4]) == list else 0)
                  )
    """
    #优化后的textmaxline算法
    textmaxline = max((len(i) if type(i) == list else 0) for i in text[1:])
    #修复bug,使没有输出的格中填充空格
    text = [([" " * (interval + 3) for _ in range(textmaxline)] if i == 0 else i) for i in text]
    #print(textmaxline)
    blocktext = []
    #return
    for i in range(textmaxline):
        blocktext_il = [text[0]] if not i else [" " * (interval + 3) for _ in range(textmaxline)]
        for o in range(1,5):
            #if type(text[o]) == list:
                if len(text[o]) > i:
                    blocktext_il.append(text[o][i])
        # 修复第二行后前面多出一列的bug(对timestamp及每行有多个显示内容时基本无效)
        if len(blocktext_il) >= len(showlist):
            blocktext_il = blocktext_il[len(blocktext_il) - len(showlist):]
        blocktext.append(blocktext_il)
        del blocktext_il# 释放资源
    
    timestamp_recut = cutstring(blocktext[0][0],interval + 4)
    for i in range(len(timestamp_recut)):
        if len(blocktext) >= i:
            blocktext[i][0] = timestamp_recut[i]
    
    if len(blocktext) > len(timestamp_recut):
        for i in range(len(timestamp_recut),len(blocktext)):
            blocktext[i][0] = " " * (interval + 4)
    #print(blocktext)
    #return
    del text,timestamp_recut # 释放资源
    
    #print("blocktext:")
    #print(blocktext)
    #return
    text = "\n".join(["┃" + "│".join(i) + "┃" for i in blocktext])
    #print("text:")
    #print(text)
    return text

def flush(segmType_k = None):
    global flush_typeReal_pringList

    if NoInitError.is_init == False:
        raise NoInitError(NoInitError.ErrText)
    flush_start = time.time()
    if flushType == "real":
        text = []
        for k,v in stdout.items():
            text.append(gettextblock(k,v))
        cls()
        print("\n".join([head,*("".join([i,("\n" if text.index(i) != len(text) - 1 else ""),(sep if text.index(i) != len(text) - 1 else "")]) for i in text),end]))
    elif flushType == "segm":
        if "title" not in flush_typeReal_pringList:
            cls()
            print(head,end = "")
            flush_typeReal_pringList.append("title")
        else:
            if segmType_k:
                print("\n" + gettextblock(segmType_k,stdout[segmType_k]) + "\n" + sep,end = "")
                flush_typeReal_pringList.append(segmType_k)
            for k,v in stdout.items():
                if k not in flush_typeReal_pringList:
                    print("\n" + gettextblock(k,v) + "\n" + sep,end = "")
                    flush_typeReal_pringList.append(k)
    else:
        raise FlushTypeError(FlushTypeError.ErrText % flushType)
    if ds.flush_average_time:
        ds.flush_average_time = (ds.flush_average_time + time.time() - flush_start) / 2
    else:
        ds.flush_average_time = time.time() - flush_start
"""
当datwrite之前的没有输出,dat会输出到前面而不是最后一列
"""


def init(timeshow = True,info = True,err = True,warn = True,dat = True,takeStderr = True):
    global end,head,sep,showlist,interval
    if NoInitError.is_init:return
    
    if takeStderr:
        global Sys_stderr
        Sys_stderr = sys.stderr
        sys.stderr = stderr_take
    
    init_start = time.time()
    interval = block_interval
    fit = "┏"
    title = ""
    sep = "┣"
    end = "┗"
    showlist = []
    titletextlen = []
    titlelen = 0
    if timeshow:
        titlelen += 1
        titletextlen.append(5)
        showlist.append("order")
        title += "order" + " " * (interval - 1)
    if info:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("info")
        title += "│info" + " " * (interval - 1)
    if warn:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("warn")
        title += "│warn" + " " * (interval - 1)
    if err:
        titlelen += 1
        titletextlen.append(4)
        showlist.append("err")
        title += "│err " + " " * (interval - 1)
    if dat:
        titlelen += 1
        titletextlen.append(3)
        showlist.append("dat")
        title += "│dat " + " " * (interval - 1)
    
    for i in range(titlelen):
        fit += "━" * (interval - 1 + titletextlen[i]) + ("┯" if titlelen - 1 != i else "━")
    fit += "┓"
    
    for i in range(titlelen):
        sep += "━" * (interval - 1 + titletextlen[i]) + ("┿" if titlelen - 1 != i else "━")
    sep += "┫"
    
    for i in range(titlelen):
        end += "━" * (interval - 1 + titletextlen[i]) + ("┷" if titlelen - 1 != i else "━")
    end += "┛"
    
    head = "".join([fit,"\n┃",title,"┃\n",sep])
    #coloramaInit()
    cls()
    print(head + "\n" + end,end = "\n")
    
    NoInitError.is_init = True
    ds.init_time = time.time() - init_start

"""
# 测试用
stdout = {"1":[
               {"type":"info","text":"1infotext..........." * 2},
               {"type":"warn","text":"1warntext..........." * 2},
               {"type":"err","text":"1errtext............" * 2}
              ],
          "2":[
               {"type":"info","text":"2infotext..........." * 2},
               {"type":"warn","text":"2warntext..........." * 2},
               {"type":"dat","text":"2dattext............" * 2}
              ]
         }
"""
#block_interval = 11
"""
stdout[3] = [{"type":"info","text":"adtjkvd" * 3},
             {"type":"dat","text":"adtjkvd" * 3}]
"""
if __name__ == "__main__":
    init()
    order.accuracy = 6
    stderr.write("nihao" * 2)
    stdinfo.write("nihao " * 5)
    flush()
"""
win = tk.Tk()
win.title("Python_Std")

win.mainloop()"""
  • 写回答

2条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-07-24 19:22
    关注

    引自免费微信小程序:皆我百晓生

    确保Python代码中的Std_类和flush函数完全线程安全,我们需要对共享资源进行加锁,并确保在并发情况下不会出现数据竞争或混乱。以下是实现这一目标的步骤和建议:

    1. 使用锁进行同步

    你已经使用了RLock来创建一个锁flushLock,这是一个很好的开始。你应该在所有访问共享资源(如stdout字典)的地方使用这个锁。

    2. 修改Std_类的方法

    Std_类的方法中,如write方法,应该使用flushLock来确保线程安全。这样,当一个线程正在写入数据时,其他线程必须等待,直到锁被释放。

    3. 在flush函数中加锁

    flush函数中,当你访问或修改stdout时,也要使用flushLock。这样可以确保在输出到屏幕或进行其他相关操作时不会发生数据混乱。

    示例代码

    以下是一个简化版的示例,展示如何在关键部分使用锁来保证线程安全:

    # ... 其他代码保持不变 ...
    
    class Std_:
        def __init__(self, type):
            self.type = type
            
        def write(self, text):
            with flushLock:  # 使用锁来同步访问
                # ... 其他代码保持不变 ...
                
    # ... 其他代码保持不变,但在flush函数中使用锁 ...
    def flush(segmType_k = None):
        global flushLock, stdout  # 确保全局变量在函数内可访问
        with flushLock:  # 使用锁来保护对stdout的访问
            # ... 其他代码保持不变 ...
    

    注意事项

    • 确保在所有访问或修改共享资源(如stdout)的代码段中都使用相同的锁。
    • 考虑使用更高级的并发控制结构,如信号量或条件变量,如果需要在多个线程之间进行更复杂的协调。
    • 测试你的代码在多线程环境下的表现,确保它按预期工作。

    通过这些步骤和修改,你的代码应该能够在多线程环境中安全地运行。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 8月12日
  • 已采纳回答 8月4日
  • 创建了问题 7月24日

悬赏问题

  • ¥20 wpf datagrid单元闪烁效果失灵
  • ¥15 券商软件上市公司信息获取问题
  • ¥100 ensp启动设备蓝屏,代码clock_watchdog_timeout
  • ¥15 Android studio AVD启动不了
  • ¥15 陆空双模式无人机怎么做
  • ¥15 想咨询点问题,与算法转换,负荷预测,数字孪生有关
  • ¥15 C#中的编译平台的区别影响
  • ¥15 软件供应链安全是跟可靠性有关还是跟安全性有关?
  • ¥15 电脑蓝屏logfilessrtsrttrail问题
  • ¥20 关于wordpress建站遇到的问题!(语言-php)(相关搜索:云服务器)