Tony Einstein 2023-05-05 11:52 采纳率: 45%
浏览 39
已结题

python进行多进程的时候报错futures.process._RemoteTraceback?

问题描述

python进行多进程处理的时候报错。

问题详细

python进行多进程的时候报错,程序中有写日志行为、PooledDB连接池行为,但是都已经加锁。
不加多进程的时候,是可以正常运行的。

数据库连接用的是这个:

from DBUtils.PooledDB import PooledDB, SharedDBConnection

class DataBase:
    def __init__(self,arg):
        self.account = arg.sql_account
        self.password = arg.sql_password
        self.host = arg.sql_host
        self.port = arg.sql_port
        self.db = arg.sql_db
        # ---------------------------------------------------------
        self.table_input = arg.table_input
        self.table_output_main = arg.table_output_main
        self.table_output_sub = arg.table_output_sub
        # ---------------------------------------------------------
        self.pool = self.init_pool()
        self.lock = Lock()

    def init_pool(self):
        """
        creator:数据库驱动模块,如常见的pymysql,pymssql,cx_Oracle模块。无默认值
        mincached:初始化连接池时创建的连接数。默认为0,即初始化时不创建连接。(建议默认0,假如非0的话,在某些数据库不可用时,整个项目会启动不了)
        maxcached:池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认)
        maxshared:池中共享连接的最大数量。默认为0,即每个连接都是专用的,不可共享(不常用,建议默认)
        maxconnections:被允许的最大连接数。默认为0,无最大数量限制。(视情况而定)
        blocking:连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。(建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接)
        maxusage:连接的最大使用次数。默认0,即无使用次数限制。(建议默认)
        setsession:可选的SQL命令列表,可用于准备会话。(例如设置时区)
        reset:当连接返回到池中时,重置连接的方式。默认True,总是执行回滚。(不太清楚,建议默认)
        ping:确定何时使用ping()检查连接。默认1,即当连接被取走,做一次ping操作。0是从不ping,1是默认,2是当该连接创建游标时ping,4是执行sql语句时ping,7是总是ping
        :return:
        """
        try:
            pool = PooledDB(
                creator=pymysql,
                maxconnections=0,
                mincached=0,
                maxcached=0,
                maxshared=0,
                host=self.host,
                port=self.port,
                user=self.account,
                password=self.password,
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor,
                autocommit=False,
                ping=7,
                blocking=True
            )
            return pool
        except Exception as e:
            time.sleep(120)
            self.init_pool()

日志记录用的是这个:

def get_logger(name,arg,level='info'):
    logger = logging.getLogger(name)
    # 判断目录是否存在,存在不创建,不存在则创建log目录
    if not os.path.exists(arg.log_path):
        os.makedirs(arg.log_path)
    # 设置颜色

    log_colors_config = {
        'DEBUG': 'white',  # cyan white
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'bold_red',
    }
    # 设置日志基础级别
    level_relations = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }
    logger.setLevel(level_relations.get(level))
    # -------------------------------------------
    # 日志输出格式
    console_format = '%(log_color)s [%(asctime)s] [%(name)s] [%(threadName)s %(process)d %(thread)d] [%(filename)s %(funcName)s line:%(lineno)d] %(levelname)s: %(message)s'
    file_format = '[%(asctime)s] [%(name)s] [%(threadName)s %(process)d %(thread)d] [%(filename)s %(funcName)s line:%(lineno)d] %(levelname)s: %(message)s'
    # -------------------------------------------
    console_formatter =  colorlog.ColoredFormatter(console_format,log_colors=log_colors_config)
    # 控制台日志
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(console_formatter)
    # -------------------------------------------
    # info日志文件名:time.strftime('%Y-%m-%d')
    info_file_name = 'LOG-' + time.strftime('%Y-%m-%d', time.localtime(time.time())) + '.log'

    """
    #实例化TimedRotatingFileHandler
    # filename:日志文件名
    # when:日志文件按什么切分。'S'-秒;'M'-分钟;'H'-小时;'D'-天;'W'-周
    #       这里需要注意,如果选择 D-天,那么这个不是严格意义上的'天',是从你
    #       项目启动开始,过了24小时,才会重新创建一个新的日志文件,如果项目重启,
    #       这个时间就会重置。选择'MIDNIGHT'-是指过了凌晨12点,就会创建新的日志
    # interval是时间间隔
    # backupCount:是保留日志个数。默认的0是不会自动删除掉日志。如果超过这个个数,就会自动删除。建议默认。
    """
    file_handler = handlers.TimedRotatingFileHandler(filename=os.path.join(arg.log_path,info_file_name),
                                            when='MIDNIGHT',
                                            interval=1,
                                            backupCount=0,
                                            encoding='utf-8')
    # rotateHandler = ConcurrentRotatingFileHandler(filename=os.path.join(arg.log_path,info_file_name),backupCount=0,encoding='utf-8')
    # 设置文件里写入的格式
    file_formatter = logging.Formatter(file_format)
    file_handler.setFormatter(file_formatter)
    # -------------------------------------------
    # 添加日志处理器
    if not logger.handlers:
        # logger.addHandler(rotateHandler)
        logger.addHandler(console_handler)
        logger.addHandler(file_handler)
    # logger.addHandler(file_handler)
    # logger.addHandler(console_handler)
    return logger

报错详情

总进度:   0%|          | 0/2 [00:00<?, ?it/s]
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\xxx\anaconda3\envs\learn\lib\multiprocessing\queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\xxx\anaconda3\envs\learn\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'module' object
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "D:\Desktop\规划\weighing_algorithm\main.py", line 204, in <module>
    work(op, logger_sql, logger_procedure,arg)
  File "D:\Desktop\规划\weighing_algorithm\main.py", line 139, in work
    data_results = list(tqdm(process_pool.map(calculation_data, task_parameters), desc="总进度",total=len(task_parameters),position=0))
  File "C:\Users\xxx\anaconda3\envs\learn\lib\site-packages\tqdm\std.py", line 1195, in __iter__
    for obj in iterable:
  File "C:\Users\xxx\anaconda3\envs\learn\lib\concurrent\futures\process.py", line 562, in _chain_from_iterable_of_lists
    for element in iterable:
  File "C:\Users\xxx\anaconda3\envs\learn\lib\concurrent\futures\_base.py", line 609, in result_iterator
    yield fs.pop().result()
  File "C:\Users\xxx\anaconda3\envs\learn\lib\concurrent\futures\_base.py", line 446, in result
    return self.__get_result()
  File "C:\Users\xxx\anaconda3\envs\learn\lib\concurrent\futures\_base.py", line 391, in __get_result
    raise self._exception
  File "C:\Users\xxx\anaconda3\envs\learn\lib\multiprocessing\queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\xxx\anaconda3\envs\learn\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'module' object

进程已结束,退出代码1

有哪位博友可以指导一下吗。

  • 写回答

1条回答 默认 最新

  • 赵4老师 2023-05-05 14:07
    关注
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 10月25日
  • 修改了问题 5月5日
  • 创建了问题 5月5日

悬赏问题

  • ¥15 远程桌面文档内容复制粘贴,格式会变化
  • ¥15 关于#java#的问题:找一份能快速看完mooc视频的代码
  • ¥15 这种微信登录授权 谁可以做啊
  • ¥15 请问我该如何添加自己的数据去运行蚁群算法代码
  • ¥20 用HslCommunication 连接欧姆龙 plc有时会连接失败。报异常为“未知错误”
  • ¥15 网络设备配置与管理这个该怎么弄
  • ¥20 机器学习能否像多层线性模型一样处理嵌套数据
  • ¥20 西门子S7-Graph,S7-300,梯形图
  • ¥50 用易语言http 访问不了网页
  • ¥50 safari浏览器fetch提交数据后数据丢失问题