Cu846 2025-07-31 17:37 采纳率: 0%
浏览 5
已结题

数据库性能监测的脚本

有DBA的朋友,可以帮我看看我的代码是否达到企业级的数据库性能监控要求,可以给我的代码提些建议吗

import time
import pymysql
import logging
import threading
from datetime import datetime
from queue import Queue
from typing import List, Dict

# 配置日志系统
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
    handlers=[
        logging.FileHandler('数据库性能监测脚本/db_monitor.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class DatabaseMonitor:
    """单个数据库实例的监控器"""

    def __init__(self, db_config: Dict):
        self.db_config = db_config
        self.instance_name = db_config.get('name', f"{db_config['host']}:{db_config.get('port', 3306)}")
        self.connection = None
        self.lock = threading.Lock()
        self.connection_status = "未连接"
        # 新增:记录上一次的查询次数和时间(用于计算QPS)
        self.last_query_count = 0
        self.last_query_time = None

    def connect(self) -> bool:
        """建立数据库连接(带重试机制)"""
        retry_count = 3
        for i in range(retry_count):
            try:
                with self.lock:
                    self.connection = pymysql.connect(
                        host=self.db_config['host'],
                        port=int(self.db_config.get('port', 3306)),
                        user=self.db_config['user'],
                        password=self.db_config['password'],
                        database=self.db_config.get('database', 'mysql'),
                        charset='utf8mb4',
                        cursorclass=pymysql.cursors.DictCursor,
                        connect_timeout=10,
                        read_timeout=15
                    )
                self.connection_status = "连接成功"
                logger.info(f"[{self.instance_name}] 数据库连接成功")
                return True
            except Exception as e:
                self.connection_status = f"连接失败: {str(e)}"
                logger.warning(f"[{self.instance_name}] 连接失败(第{i + 1}次重试): {str(e)}")
                if i < retry_count - 1:
                    time.sleep(2 ** i)
        return False

    def close(self):
        """安全关闭数据库连接"""
        with self.lock:
            if self.connection and self.connection.open:
                self.connection.close()
                self.connection_status = "已关闭"
                logger.info(f"[{self.instance_name}] 连接已关闭")

    def collect_metrics(self) -> Dict:
        """采集单个数据库的性能指标(新增查询次数监控)"""
        metrics = {
            'instance_name': self.instance_name,
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'connection_status': self.connection_status,
            'status': 'OK'
        }

        try:
            with self.lock:
                if not self.connection or not self.connection.open:
                    if not self.connect():
                        metrics['status'] = "连接失败"
                        return metrics

            with self.connection.cursor() as cursor:
                # 1. 连接与并发指标
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_connected'")
                metrics['threads_connected'] = int(cursor.fetchone()['Value'])

                cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_running'")
                metrics['threads_running'] = int(cursor.fetchone()['Value'])

                # 2. 慢查询指标
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Slow_queries'")
                metrics['slow_queries'] = int(cursor.fetchone()['Value'])

                # 3. 查询次数指标(新增)
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Queries'")  # 总查询次数(包括所有SQL)
                current_queries = int(cursor.fetchone()['Value'])
                metrics['total_queries'] = current_queries  # 累计总查询次数

                # 计算每秒查询量(QPS)
                current_time = time.time()
                if self.last_query_count > 0 and self.last_query_time:
                    time_diff = current_time - self.last_query_time
                    query_diff = current_queries - self.last_query_count
                    if time_diff > 0:
                        metrics['qps'] = round(query_diff / time_diff, 2)  # 保留两位小数
                    else:
                        metrics['qps'] = 0.0
                else:
                    metrics['qps'] = 0.0  # 首次采集时无历史数据,QPS为0

                # 更新历史记录,用于下次计算
                self.last_query_count = current_queries
                self.last_query_time = current_time

                # 4. InnoDB缓冲池指标
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_reads'")
                metrics['innodb_physical_reads'] = int(cursor.fetchone()['Value'])

                cursor.execute("SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_read_requests'")
                metrics['innodb_logical_reads'] = int(cursor.fetchone()['Value'])

                # 计算缓冲池命中率
                if metrics['innodb_logical_reads'] + metrics['innodb_physical_reads'] > 0:
                    hit_rate = (metrics['innodb_logical_reads'] /
                                (metrics['innodb_logical_reads'] + metrics['innodb_physical_reads'])) * 100
                    metrics['innodb_buffer_hit_rate'] = f"{hit_rate:.2f}%"
                else:
                    metrics['innodb_buffer_hit_rate'] = "N/A"

                # 5. 事务与I/O指标
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Com_commit'")
                metrics['transactions_committed'] = int(cursor.fetchone()['Value'])

                cursor.execute("SHOW GLOBAL STATUS LIKE 'Com_rollback'")
                metrics['transactions_rolled_back'] = int(cursor.fetchone()['Value'])

        except Exception as e:
            metrics['status'] = f"采集异常: {str(e)}"
            metrics['connection_status'] = f"异常断开: {str(e)}"
            logger.error(f"[{self.instance_name}] 指标采集失败: {str(e)}")

        return metrics


class MultiDatabaseMonitor:
    """多数据库监控管理器"""

    def __init__(self, db_configs: List[Dict], interval: int = 10):
        self.monitors = [DatabaseMonitor(cfg) for cfg in db_configs]
        self.interval = interval
        self.metrics_queue = Queue(maxsize=1000)
        self.running = False
        self.consumer_thread = threading.Thread(target=self._consume_metrics, daemon=True)

    def start(self):
        self.running = True
        self.consumer_thread.start()
        self.show_initial_connection_status()

        for monitor in self.monitors:
            thread = threading.Thread(
                target=self._collect_loop,
                args=(monitor,),
                name=f"monitor-{monitor.instance_name}",
                daemon=True
            )
            thread.start()
        logger.info(f"多数据库监控已启动(共{len(self.monitors)}个实例,采集间隔{self.interval}秒)")

    def show_initial_connection_status(self):
        logger.info("\n===== 数据库初始连接状态 =====")
        for monitor in self.monitors:
            if monitor.connect():
                status = "✅ 连接成功"
            else:
                status = f"❌ {monitor.connection_status}"
            logger.info(f"{monitor.instance_name}: {status}")
        logger.info("===========================\n")

    def _collect_loop(self, monitor: DatabaseMonitor):
        while self.running:
            try:
                metrics = monitor.collect_metrics()
                self.metrics_queue.put(metrics, block=False)
            except Exception as e:
                logger.error(f"[{monitor.instance_name}] 采集循环异常: {str(e)}")
            time.sleep(self.interval)

    def _consume_metrics(self):
        """处理采集到的指标数据,包含查询次数和QPS"""
        while self.running:
            if not self.metrics_queue.empty():
                metrics = self.metrics_queue.get()
                logger.info(f"\n===== {metrics['instance_name']} 监控数据 =====")
                logger.info(f"时间: {metrics['timestamp']}")
                logger.info(f"连接状态: {metrics['connection_status']}")
                logger.info(f"当前连接数: {metrics.get('threads_connected', 'N/A')}")
                logger.info(f"活跃连接数: {metrics.get('threads_running', 'N/A')}")
                logger.info(f"慢查询总数: {metrics.get('slow_queries', 'N/A')}")
                # 新增:查询次数相关指标
                logger.info(f"累计总查询次数: {metrics.get('total_queries', 'N/A')}")
                logger.info(f"每秒查询量(QPS): {metrics.get('qps', 'N/A')}")
                # 其他原有指标
                logger.info(f"InnoDB物理读: {metrics.get('innodb_physical_reads', 'N/A')}")
                logger.info(f"InnoDB逻辑读: {metrics.get('innodb_logical_reads', 'N/A')}")
                logger.info(f"缓冲池命中率: {metrics.get('innodb_buffer_hit_rate', 'N/A')}")
                logger.info(f"已提交事务: {metrics.get('transactions_committed', 'N/A')}")
                logger.info(f"已回滚事务: {metrics.get('transactions_rolled_back', 'N/A')}")
                logger.info("========================================\n")
                self.metrics_queue.task_done()
            time.sleep(1)

    def stop(self):
        self.running = False
        for monitor in self.monitors:
            monitor.close()
        self.consumer_thread.join()
        logger.info("多数据库监控已停止")


if __name__ == "__main__":
    DB_CONFIGS = [
        {
            'name': 'employees',
            'host': '172.20.10.4',
            'port': 3306,
            'user': 'root',
            'password': '123456',
            'database': 'mysql'
        }
    ]

    try:
        monitor = MultiDatabaseMonitor(DB_CONFIGS, interval=10)
        monitor.start()
        while True:
            time.sleep(3600)
    except KeyboardInterrupt:
        monitor.stop()
        logger.info("用户中断,监控已停止")



谢谢

  • 写回答

4条回答 默认 最新

  • 檀越@新空间 2025-07-31 17:38
    关注

    下午好🌅🌅🌅
    本答案参考通义千问

    你的脚本已经具备一定的数据库性能监控基础功能,包括连接管理、日志记录、多线程支持和基本的指标采集。不过,为了满足企业级数据库性能监控的要求,还需要进一步优化和增强,特别是在稳定性、可扩展性、数据准确性、报警机制、安全性、性能开销控制等方面。


    一、当前代码存在的问题与改进方向

    1. 未完成的QPS计算逻辑

    你只写到了 if time_diff > 0: 的判断,但没有继续处理计算结果并保存到 metrics 中。这会导致 QPS 计算不完整。

    2. 缺乏错误处理机制

    虽然你有 try-except 块,但在执行 SQL 查询时可能仍然会因为语法错误或权限不足而崩溃。

    3. 缺乏报警机制

    企业级监控通常需要在指标异常时发出警报(如邮件、短信、Slack通知等)。

    4. 缺少配置管理

    目前的配置是硬编码的,建议使用配置文件(如 YAML 或 JSON)来管理数据库连接信息。

    5. 性能开销较大

    频繁地执行 SHOW GLOBAL STATUS 可能会对数据库造成一定压力,尤其在高并发场景下。

    6. 缺乏历史数据存储

    目前只是打印和记录日志,没有将数据持久化到数据库或时间序列数据库(如 Prometheus、InfluxDB 等)。


    二、优化建议与解决方案

    1. 完善QPS计算逻辑

                    # 计算每秒查询量(QPS)
                    current_time = time.time()
                    if self.last_query_count > 0 and self.last_query_time:
                        time_diff = current_time - self.last_query_time
                        query_diff = current_queries - self.last_query_count
                        if time_diff > 0:
                            qps = query_diff / time_diff
                            metrics['qps'] = round(qps, 2)
                        else:
                            metrics['qps'] = 0
                    else:
                        metrics['qps'] = 0
    
                    # 更新上次查询次数和时间
                    self.last_query_count = current_queries
                    self.last_query_time = current_time
    

    重点: 完善 QPS 计算,确保每次采集都更新上次的查询计数和时间,以保证后续计算准确。


    2. 添加更完善的错误处理

                    # 3. 查询次数指标(新增)
                    cursor.execute("SHOW GLOBAL STATUS LIKE 'Queries'")
                    result = cursor.fetchone()
                    if not result or 'Value' not in result:
                        logger.warning(f"[{self.instance_name}] 无法获取 Queries 指标")
                        metrics['total_queries'] = 0
                    else:
                        current_queries = int(result['Value'])
                        metrics['total_queries'] = current_queries
    

    重点: 对 SQL 查询结果进行有效性检查,防止因字段缺失导致程序崩溃。


    3. 添加报警机制(示例)

    你可以使用 smtplib 发送邮件,或调用第三方 API 发送告警。

    def send_alert(message):
        """发送告警消息(示例:邮件)"""
        import smtplib
        from email.mime.text import MIMEText
    
        msg = MIMEText(message)
        msg['Subject'] = '数据库性能异常告警'
        msg['From'] = 'monitor@example.com'
        msg['To'] = 'admin@example.com'
    
        with smtplib.SMTP('smtp.example.com') as server:
            server.sendmail(msg['From'], msg['To'], msg.as_string())
    

    重点: 在关键指标(如慢查询突增、连接失败、QPS下降)超过阈值时触发报警。


    4. 引入配置文件管理

    使用 config.yaml 文件管理数据库连接信息:

    # config.yaml
    databases:
      - name: "MySQL-Prod"
        host: "192.168.1.10"
        port: 3306
        user: "monitor_user"
        password: "secure_password"
        database: "mysql"
    

    然后读取配置文件:

    import yaml
    
    with open('config.yaml', 'r', encoding='utf-8') as f:
        config = yaml.safe_load(f)
        db_configs = config.get('databases', [])
    

    重点: 配置集中管理,便于维护和部署。


    5. 优化性能监控频率

    避免频繁执行 SHOW GLOBAL STATUS,可以设置一个合理的采集间隔(如每 10 秒一次)。

    def run(self):
        while True:
            try:
                self.collect_metrics()
            except Exception as e:
                logger.error(f"[{self.instance_name}] 收集指标时出错: {str(e)}")
            time.sleep(10)  # 每10秒采集一次
    

    重点: 控制采集频率,减少对数据库的负载影响。


    6. 持久化监控数据

    可以将监控数据存储到数据库中,或者使用时间序列数据库(如 InfluxDB)。

    from influxdb import InfluxDBClient
    
    class InfluxWriter:
        def __init__(self, host, port, user, password, db_name):
            self.client = InfluxDBClient(host, port, user, password, db_name)
    
        def write_metric(self, metric):
            point = {
                "measurement": "db_metrics",
                "tags": {"instance": metric['instance_name']},
                "fields": {
                    "threads_connected": metric.get('threads_connected', 0),
                    "threads_running": metric.get('threads_running', 0),
                    "slow_queries": metric.get('slow_queries', 0),
                    "qps": metric.get('qps', 0),
                    "status": metric.get('status')
                },
                "time": datetime.now().isoformat()
            }
            self.client.write_points([point])
    

    重点: 将监控数据存储到时序数据库,便于长期分析和可视化。


    三、最终优化后的代码片段(部分)

    修改后的 collect_metrics 方法:

    def collect_metrics(self) -> Dict:
        metrics = {
            'instance_name': self.instance_name,
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'connection_status': self.connection_status,
            'status': 'OK'
        }
    
        try:
            with self.lock:
                if not self.connection or not self.connection.open:
                    if not self.connect():
                        metrics['status'] = "连接失败"
                        return metrics
    
            with self.connection.cursor() as cursor:
                # 1. 连接与并发指标
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_connected'")
                metrics['threads_connected'] = int(cursor.fetchone()['Value'])
    
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_running'")
                metrics['threads_running'] = int(cursor.fetchone()['Value'])
    
                # 2. 慢查询指标
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Slow_queries'")
                metrics['slow_queries'] = int(cursor.fetchone()['Value'])
    
                # 3. 查询次数指标(新增)
                cursor.execute("SHOW GLOBAL STATUS LIKE 'Queries'")
                result = cursor.fetchone()
                if not result or 'Value' not in result:
                    logger.warning(f"[{self.instance_name}] 无法获取 Queries 指标")
                    metrics['total_queries'] = 0
                else:
                    current_queries = int(result['Value'])
                    metrics['total_queries'] = current_queries
    
                    # 计算每秒查询量(QPS)
                    current_time = time.time()
                    if self.last_query_count > 0 and self.last_query_time:
                        time_diff = current_time - self.last_query_time
                        query_diff = current_queries - self.last_query_count
                        if time_diff > 0:
                            qps = query_diff / time_diff
                            metrics['qps'] = round(qps, 2)
                        else:
                            metrics['qps'] = 0
                    else:
                        metrics['qps'] = 0
    
                    # 更新上次查询次数和时间
                    self.last_query_count = current_queries
                    self.last_query_time = current_time
    
        except Exception as e:
            logger.error(f"[{self.instance_name}] 收集指标时发生错误: {str(e)}")
            metrics['status'] = "采集失败"
    
        return metrics
    

    四、总结

    | 项目 | 是否符合企业级要求 | 说明 | |------|------------------|------| | 日志系统 | ✅ | 已支持文件和控制台输出 | | 多线程安全 | ✅ | 使用了锁机制 | | 错误处理 | ⚠️ | 需要进一步完善 | | 报警机制 | ❌ | 缺少 | | 配置管理 | ❌ | 应使用外部配置文件 | | 数据持久化 | ❌ | 缺少 | | 性能开销 | ⚠️ | 频率过高,需优化 |


    如果你希望我帮你整合完整的版本(包含配置加载、报警、数据存储等功能),也可以告诉我,我可以提供完整的代码结构。

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 7月31日
  • 创建了问题 7月31日