我项目使用锁,当把正在分析的任务(每个任务对应一个Qthread)通过teminate中止后,如果锁没有释放,待执行的任务开始执行后会被卡住,无法执行下去,Qthread中的analyse方法执行时间要10分钟以上,如何正确的中止Qthread, 使它立即结束运行,并释放他内部的锁啊?
上个任务中止后,锁没有释放,当前任务一直被卡住,十几分钟了,一动不动,也不报错
# 执行操作的线程
class WorkerThread(QThread):
_signal = pyqtSignal(object)
def __init__(self, param=None):
super().__init__()
self.param = param
def run(self):
try:
logger.info('running')
analyse(self.param)
self._signal.emit(self.param)
except Exception as e:
traceback.print_exc()
analyse_q.put(logMsg("操作异常"))
logger.error(traceback.format_exc())
class PythonToJS(QObject):
mySqlite = SqliteTool()
def __init__(self, view):
super().__init__()
self.view = view
self.th = None
# 启动任务
def run(self, reportId):
try:
print('run')
self.th = WorkerThread(reportId)
self.th._signal.connect(self.finished)
self.th.start()
except Exception as e:
print('e', e)
logger.error(traceback.format_exc())
# 终止任务
@pyqtSlot(str)
def terminate(self, s):
print('terminate', s)
logger.info('terminate')
try:
product = json.loads(s)['parameter']
if product:
remove_task(task_q, product['reportId'])
if product['analyseStatus'] == 1:
if self.th:
self.th.terminate()
self.th = None
remove_task(running_q, product['reportId'])
analyse_q.put(logMsg(""))
analyse_q.put(logMsg("*******************************"))
analyse_q.put(logMsg("用户终止分析..."))
analyse_q.put(logMsg("*******************************"))
analyse_q.put(logMsg(""))
# 更新状态
self.mySqlite.update(
"update fof_amt_product_report set analyse_status = 4 where report_id = ?", (product['reportId'],))
obj = json.dumps({'ANS_MSG_HDR': {'MSG_CODE': "200", "MSG_TEXT": "success"}},
ensure_ascii=False)
self.view.page().runJavaScript('%s(%s)' % ('onTerminate', obj))
except Exception as e:
print(e)
logger.error(traceback.format_exc())
class SqliteTool(object):
"""
简单sqlite数据库工具类
编写这个类主要是为了封装sqlite,继承此类复用方法
"""
def __init__(self, dbName="data.db", timeout = 10):
"""
初始化连接——使用完需关闭连接
:param dbName: 连接库的名字,注意,以'.db'结尾
"""
self.dbName = dbName
self.lockTimeout = timeout
self._lock = threading.Lock()
@contextmanager
def _locked_cursor(self):
try:
# 连接数据库
self._conn = sqlite3.connect(self.dbName, timeout=self.lockTimeout)
# 开启 WAL 模式, 提升并发性能
self._conn.execute('PRAGMA journal_mode = WAL;')
self._conn.execute("PRAGMA synchronous=NORMAL;")
self._conn.row_factory = dict_factory
# 创建游标
self._cur = self._conn.cursor()
yield self._cur
self._conn.commit()
except Exception as e:
self._conn.rollback()
raise e
finally:
self._cur.close()
self._conn.close()
# 插入或更新表数据,一次插入或更新一条数据
def insert(self, sql: str, value: tuple):
"""
插入或更新单条表记录
:param sql: insert语句或update语句
:param value: 插入或更新的值,形如()
:return: True表示插入或更新成功
"""
try:
with self._lock:
with self._locked_cursor() as cur:
cur.execute(sql, value)
return True
except Exception as e:
print("insert error", e)
return False
# 插入或更新表数据,一次插入或更新一条数据
def update(self, sql: str, value: tuple):
"""
插入或更新单条表记录
:param sql: insert语句或update语句
:param value: 插入或更新的值,形如()
:return: True表示插入或更新成功
"""
with self._lock:
with self._locked_cursor() as cur:
try:
cur.execute(sql, value)
print('update success')
except Exception as e:
print('update error', e)