问题描述
python进行多进程处理的时候报错。
问题详细
python进行多进程的时候报错,程序中有写日志行为、PooledDB连接池行为,但是都已经加锁。
不加多进程的时候,是可以正常运行的。
数据库连接用的是这个:
from DBUtils.PooledDB import PooledDB, SharedDBConnection
class DataBase:
def __init__(self,arg):
self.account = arg.sql_account
self.password = arg.sql_password
self.host = arg.sql_host
self.port = arg.sql_port
self.db = arg.sql_db
# ---------------------------------------------------------
self.table_input = arg.table_input
self.table_output_main = arg.table_output_main
self.table_output_sub = arg.table_output_sub
# ---------------------------------------------------------
self.pool = self.init_pool()
self.lock = Lock()
def init_pool(self):
"""
creator:数据库驱动模块,如常见的pymysql,pymssql,cx_Oracle模块。无默认值
mincached:初始化连接池时创建的连接数。默认为0,即初始化时不创建连接。(建议默认0,假如非0的话,在某些数据库不可用时,整个项目会启动不了)
maxcached:池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认)
maxshared:池中共享连接的最大数量。默认为0,即每个连接都是专用的,不可共享(不常用,建议默认)
maxconnections:被允许的最大连接数。默认为0,无最大数量限制。(视情况而定)
blocking:连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。(建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接)
maxusage:连接的最大使用次数。默认0,即无使用次数限制。(建议默认)
setsession:可选的SQL命令列表,可用于准备会话。(例如设置时区)
reset:当连接返回到池中时,重置连接的方式。默认True,总是执行回滚。(不太清楚,建议默认)
ping:确定何时使用ping()检查连接。默认1,即当连接被取走,做一次ping操作。0是从不ping,1是默认,2是当该连接创建游标时ping,4是执行sql语句时ping,7是总是ping
:return:
"""
try:
pool = PooledDB(
creator=pymysql,
maxconnections=0,
mincached=0,
maxcached=0,
maxshared=0,
host=self.host,
port=self.port,
user=self.account,
password=self.password,
charset='utf8',
cursorclass=pymysql.cursors.DictCursor,
autocommit=False,
ping=7,
blocking=True
)
return pool
except Exception as e:
time.sleep(120)
self.init_pool()
日志记录用的是这个:
def get_logger(name,arg,level='info'):
logger = logging.getLogger(name)
# 判断目录是否存在,存在不创建,不存在则创建log目录
if not os.path.exists(arg.log_path):
os.makedirs(arg.log_path)
# 设置颜色
log_colors_config = {
'DEBUG': 'white', # cyan white
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}
# 设置日志基础级别
level_relations = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'crit': logging.CRITICAL
}
logger.setLevel(level_relations.get(level))
# -------------------------------------------
# 日志输出格式
console_format = '%(log_color)s [%(asctime)s] [%(name)s] [%(threadName)s %(process)d %(thread)d] [%(filename)s %(funcName)s line:%(lineno)d] %(levelname)s: %(message)s'
file_format = '[%(asctime)s] [%(name)s] [%(threadName)s %(process)d %(thread)d] [%(filename)s %(funcName)s line:%(lineno)d] %(levelname)s: %(message)s'
# -------------------------------------------
console_formatter = colorlog.ColoredFormatter(console_format,log_colors=log_colors_config)
# 控制台日志
console_handler = logging.StreamHandler()
console_handler.setFormatter(console_formatter)
# -------------------------------------------
# info日志文件名:time.strftime('%Y-%m-%d')
info_file_name = 'LOG-' + time.strftime('%Y-%m-%d', time.localtime(time.time())) + '.log'
"""
#实例化TimedRotatingFileHandler
# filename:日志文件名
# when:日志文件按什么切分。'S'-秒;'M'-分钟;'H'-小时;'D'-天;'W'-周
# 这里需要注意,如果选择 D-天,那么这个不是严格意义上的'天',是从你
# 项目启动开始,过了24小时,才会重新创建一个新的日志文件,如果项目重启,
# 这个时间就会重置。选择'MIDNIGHT'-是指过了凌晨12点,就会创建新的日志
# interval是时间间隔
# backupCount:是保留日志个数。默认的0是不会自动删除掉日志。如果超过这个个数,就会自动删除。建议默认。
"""
file_handler = handlers.TimedRotatingFileHandler(filename=os.path.join(arg.log_path,info_file_name),
when='MIDNIGHT',
interval=1,
backupCount=0,
encoding='utf-8')
# rotateHandler = ConcurrentRotatingFileHandler(filename=os.path.join(arg.log_path,info_file_name),backupCount=0,encoding='utf-8')
# 设置文件里写入的格式
file_formatter = logging.Formatter(file_format)
file_handler.setFormatter(file_formatter)
# -------------------------------------------
# 添加日志处理器
if not logger.handlers:
# logger.addHandler(rotateHandler)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# logger.addHandler(file_handler)
# logger.addHandler(console_handler)
return logger
报错详情
总进度: 0%| | 0/2 [00:00<?, ?it/s]
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\xxx\anaconda3\envs\learn\lib\multiprocessing\queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Users\xxx\anaconda3\envs\learn\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'module' object
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "D:\Desktop\规划\weighing_algorithm\main.py", line 204, in <module>
work(op, logger_sql, logger_procedure,arg)
File "D:\Desktop\规划\weighing_algorithm\main.py", line 139, in work
data_results = list(tqdm(process_pool.map(calculation_data, task_parameters), desc="总进度",total=len(task_parameters),position=0))
File "C:\Users\xxx\anaconda3\envs\learn\lib\site-packages\tqdm\std.py", line 1195, in __iter__
for obj in iterable:
File "C:\Users\xxx\anaconda3\envs\learn\lib\concurrent\futures\process.py", line 562, in _chain_from_iterable_of_lists
for element in iterable:
File "C:\Users\xxx\anaconda3\envs\learn\lib\concurrent\futures\_base.py", line 609, in result_iterator
yield fs.pop().result()
File "C:\Users\xxx\anaconda3\envs\learn\lib\concurrent\futures\_base.py", line 446, in result
return self.__get_result()
File "C:\Users\xxx\anaconda3\envs\learn\lib\concurrent\futures\_base.py", line 391, in __get_result
raise self._exception
File "C:\Users\xxx\anaconda3\envs\learn\lib\multiprocessing\queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Users\xxx\anaconda3\envs\learn\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'module' object
进程已结束,退出代码1
有哪位博友可以指导一下吗。