M03-Aiwin 2024-01-25 20:18 采纳率: 75%
浏览 10
已结题

Python线程池遇到的重复问题

多线程遇到的问题:

import concurrent.futures
import importlib
import queue
import itertools
import threading
import time
class ThreadPool():
    def __init__(self,thread):
        self.max_thread = thread
        self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_thread)
        self.task_queue = queue.Queue()
        self.futures = {}
    '''
        module:字符串,模块名称
        function:字符串,函数名
        usernamelist:用户名列表
        passwordlist:密码列表
    '''
    def add_port_brute_task(self,module,function,host,port,usernamelist,passwordlist):
        combinations = list(itertools.product(usernamelist, passwordlist))
        for combination in combinations:
            username, password = combination
            new_task=(module,function,host,port,username,password)
            self.task_queue.put(new_task)

    def start_port_brute_threadpool(self):
        start_time=time.time()
        while self.task_queue.qsize() != 0:
            module,function, host, port, username, password=self.task_queue.get()
            args=(host,port,username,password)
            future=self.thread_pool.submit(self.run_poc,module,function,args)
            self.futures[future] = (function, args)
        self.thread_pool.shutdown(wait=True)
        print("\n用时:%.2f 秒" % (time.time() - start_time))

    '''
    通过importlib获取到模块,获取函数直接执行
    '''
    def run_poc(self,module,func,args):
        module=importlib.import_module(module)
        function=getattr(module,func)
        result=function(*args)
        if result==True:
            self.thread_pool.shutdown(wait=False)

我在其它函数中创建这个ThreadPool的类,并且调用start_port_brute_threadpool()执行一个函数的时候,会出现这个同样的password数据被执行了很多次的情况,产生的原因是什么?我只想每个password只出现一次,并且按顺序来跑。

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2024-01-25 22:22
    关注

    【以下回答由 GPT 生成】

    问题的原因是在start_port_brute_threadpool()函数中,每次循环从任务队列中取出任务时,并没有判断任务是否已经被执行过。因此,当有多个线程同时从任务队列中获取任务时,可能会重复执行同一个任务。为了解决这个问题,我们可以使用一个字典来记录已经执行过的任务,每次取任务前先判断该任务是否已经被执行过,如果已经执行过,则跳过。以下是具体解决方案:

    首先,在__init__方法中添加一个lock对象来保证多线程访问futures字典时的线程安全。代码如下:

    def __init__(self, thread):
        self.max_thread = thread
        self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_thread)
        self.task_queue = queue.Queue()
        self.futures = {}
        self.lock = threading.Lock()
    

    然后,在start_port_brute_threadpool函数中,在执行任务前先判断该任务是否已经执行过,如果已经执行过,则跳过。代码如下:

    def start_port_brute_threadpool(self):
        start_time = time.time()
        while self.task_queue.qsize() != 0:
            module, function, host, port, username, password = self.task_queue.get()
            args = (host, port, username, password)
    
            # 判断该任务是否已经执行过,如果已经执行过,则跳过
            with self.lock:
                future_args = (function, args)
                if future_args in self.futures.values():
                    continue
    
            future = self.thread_pool.submit(self.run_poc, module, function, args)
            with self.lock:
                self.futures[future] = future_args
        self.thread_pool.shutdown(wait=True)
        print("\n用时:%.2f 秒" % (time.time() - start_time))
    

    以上就是解决问题的具体方案,通过在futures字典中记录已经执行过的任务,可以避免重复执行同一个任务。另外,为了保证多线程访问futures字典的线程安全,使用了threading.Lock对象来进行加锁。


    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 1月26日
  • 创建了问题 1月25日

悬赏问题

  • ¥100 三相直流充电模块对数字电源芯片在物理上它必须具备哪些功能和性能?
  • ¥30 数字电源对DSP芯片的具体要求
  • ¥20 antv g6 折线边如何变为钝角
  • ¥30 如何在Matlab或Python中 设置饼图的高度
  • ¥15 nginx中的CORS策略应该如何配置
  • ¥30 信号与系统实验:采样定理分析
  • ¥100 我想找人帮我写Python 的股票分析代码,有意请加mathtao
  • ¥20 Vite 打包的 Vue3 组件库,图标无法显示
  • ¥15 php 同步电商平台多个店铺增量订单和订单状态
  • ¥17 pro*C预编译“闪回查询”报错SCN不能识别