有DBA的朋友,可以帮我看看我的代码是否达到企业级的数据库性能监控要求,可以给我的代码提些建议吗
import time
import pymysql
import logging
import threading
from datetime import datetime
from queue import Queue
from typing import List, Dict
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
handlers=[
logging.FileHandler('数据库性能监测脚本/db_monitor.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class DatabaseMonitor:
"""单个数据库实例的监控器"""
def __init__(self, db_config: Dict):
self.db_config = db_config
self.instance_name = db_config.get('name', f"{db_config['host']}:{db_config.get('port', 3306)}")
self.connection = None
self.lock = threading.Lock()
self.connection_status = "未连接"
# 新增:记录上一次的查询次数和时间(用于计算QPS)
self.last_query_count = 0
self.last_query_time = None
def connect(self) -> bool:
"""建立数据库连接(带重试机制)"""
retry_count = 3
for i in range(retry_count):
try:
with self.lock:
self.connection = pymysql.connect(
host=self.db_config['host'],
port=int(self.db_config.get('port', 3306)),
user=self.db_config['user'],
password=self.db_config['password'],
database=self.db_config.get('database', 'mysql'),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=10,
read_timeout=15
)
self.connection_status = "连接成功"
logger.info(f"[{self.instance_name}] 数据库连接成功")
return True
except Exception as e:
self.connection_status = f"连接失败: {str(e)}"
logger.warning(f"[{self.instance_name}] 连接失败(第{i + 1}次重试): {str(e)}")
if i < retry_count - 1:
time.sleep(2 ** i)
return False
def close(self):
"""安全关闭数据库连接"""
with self.lock:
if self.connection and self.connection.open:
self.connection.close()
self.connection_status = "已关闭"
logger.info(f"[{self.instance_name}] 连接已关闭")
def collect_metrics(self) -> Dict:
"""采集单个数据库的性能指标(新增查询次数监控)"""
metrics = {
'instance_name': self.instance_name,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'connection_status': self.connection_status,
'status': 'OK'
}
try:
with self.lock:
if not self.connection or not self.connection.open:
if not self.connect():
metrics['status'] = "连接失败"
return metrics
with self.connection.cursor() as cursor:
# 1. 连接与并发指标
cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_connected'")
metrics['threads_connected'] = int(cursor.fetchone()['Value'])
cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_running'")
metrics['threads_running'] = int(cursor.fetchone()['Value'])
# 2. 慢查询指标
cursor.execute("SHOW GLOBAL STATUS LIKE 'Slow_queries'")
metrics['slow_queries'] = int(cursor.fetchone()['Value'])
# 3. 查询次数指标(新增)
cursor.execute("SHOW GLOBAL STATUS LIKE 'Queries'") # 总查询次数(包括所有SQL)
current_queries = int(cursor.fetchone()['Value'])
metrics['total_queries'] = current_queries # 累计总查询次数
# 计算每秒查询量(QPS)
current_time = time.time()
if self.last_query_count > 0 and self.last_query_time:
time_diff = current_time - self.last_query_time
query_diff = current_queries - self.last_query_count
if time_diff > 0:
metrics['qps'] = round(query_diff / time_diff, 2) # 保留两位小数
else:
metrics['qps'] = 0.0
else:
metrics['qps'] = 0.0 # 首次采集时无历史数据,QPS为0
# 更新历史记录,用于下次计算
self.last_query_count = current_queries
self.last_query_time = current_time
# 4. InnoDB缓冲池指标
cursor.execute("SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_reads'")
metrics['innodb_physical_reads'] = int(cursor.fetchone()['Value'])
cursor.execute("SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_read_requests'")
metrics['innodb_logical_reads'] = int(cursor.fetchone()['Value'])
# 计算缓冲池命中率
if metrics['innodb_logical_reads'] + metrics['innodb_physical_reads'] > 0:
hit_rate = (metrics['innodb_logical_reads'] /
(metrics['innodb_logical_reads'] + metrics['innodb_physical_reads'])) * 100
metrics['innodb_buffer_hit_rate'] = f"{hit_rate:.2f}%"
else:
metrics['innodb_buffer_hit_rate'] = "N/A"
# 5. 事务与I/O指标
cursor.execute("SHOW GLOBAL STATUS LIKE 'Com_commit'")
metrics['transactions_committed'] = int(cursor.fetchone()['Value'])
cursor.execute("SHOW GLOBAL STATUS LIKE 'Com_rollback'")
metrics['transactions_rolled_back'] = int(cursor.fetchone()['Value'])
except Exception as e:
metrics['status'] = f"采集异常: {str(e)}"
metrics['connection_status'] = f"异常断开: {str(e)}"
logger.error(f"[{self.instance_name}] 指标采集失败: {str(e)}")
return metrics
class MultiDatabaseMonitor:
"""多数据库监控管理器"""
def __init__(self, db_configs: List[Dict], interval: int = 10):
self.monitors = [DatabaseMonitor(cfg) for cfg in db_configs]
self.interval = interval
self.metrics_queue = Queue(maxsize=1000)
self.running = False
self.consumer_thread = threading.Thread(target=self._consume_metrics, daemon=True)
def start(self):
self.running = True
self.consumer_thread.start()
self.show_initial_connection_status()
for monitor in self.monitors:
thread = threading.Thread(
target=self._collect_loop,
args=(monitor,),
name=f"monitor-{monitor.instance_name}",
daemon=True
)
thread.start()
logger.info(f"多数据库监控已启动(共{len(self.monitors)}个实例,采集间隔{self.interval}秒)")
def show_initial_connection_status(self):
logger.info("\n===== 数据库初始连接状态 =====")
for monitor in self.monitors:
if monitor.connect():
status = "✅ 连接成功"
else:
status = f"❌ {monitor.connection_status}"
logger.info(f"{monitor.instance_name}: {status}")
logger.info("===========================\n")
def _collect_loop(self, monitor: DatabaseMonitor):
while self.running:
try:
metrics = monitor.collect_metrics()
self.metrics_queue.put(metrics, block=False)
except Exception as e:
logger.error(f"[{monitor.instance_name}] 采集循环异常: {str(e)}")
time.sleep(self.interval)
def _consume_metrics(self):
"""处理采集到的指标数据,包含查询次数和QPS"""
while self.running:
if not self.metrics_queue.empty():
metrics = self.metrics_queue.get()
logger.info(f"\n===== {metrics['instance_name']} 监控数据 =====")
logger.info(f"时间: {metrics['timestamp']}")
logger.info(f"连接状态: {metrics['connection_status']}")
logger.info(f"当前连接数: {metrics.get('threads_connected', 'N/A')}")
logger.info(f"活跃连接数: {metrics.get('threads_running', 'N/A')}")
logger.info(f"慢查询总数: {metrics.get('slow_queries', 'N/A')}")
# 新增:查询次数相关指标
logger.info(f"累计总查询次数: {metrics.get('total_queries', 'N/A')}")
logger.info(f"每秒查询量(QPS): {metrics.get('qps', 'N/A')}")
# 其他原有指标
logger.info(f"InnoDB物理读: {metrics.get('innodb_physical_reads', 'N/A')}")
logger.info(f"InnoDB逻辑读: {metrics.get('innodb_logical_reads', 'N/A')}")
logger.info(f"缓冲池命中率: {metrics.get('innodb_buffer_hit_rate', 'N/A')}")
logger.info(f"已提交事务: {metrics.get('transactions_committed', 'N/A')}")
logger.info(f"已回滚事务: {metrics.get('transactions_rolled_back', 'N/A')}")
logger.info("========================================\n")
self.metrics_queue.task_done()
time.sleep(1)
def stop(self):
self.running = False
for monitor in self.monitors:
monitor.close()
self.consumer_thread.join()
logger.info("多数据库监控已停止")
if __name__ == "__main__":
DB_CONFIGS = [
{
'name': 'employees',
'host': '172.20.10.4',
'port': 3306,
'user': 'root',
'password': '123456',
'database': 'mysql'
}
]
try:
monitor = MultiDatabaseMonitor(DB_CONFIGS, interval=10)
monitor.start()
while True:
time.sleep(3600)
except KeyboardInterrupt:
monitor.stop()
logger.info("用户中断,监控已停止")
谢谢