い夜空ベ梦雨 2025-07-14 10:28 采纳率: 50%
浏览 33

rtsp 推流过程规律6-7分钟性中断问题

我目前在ubuntu上python使用ffmpeg拉流并推流,并使用mediamtx作为流媒体服务,但是出现推流6分钟左右规律性断流,从py程序来看并不是拉流推流的问题,也不是mediamtx流媒体服务问题。因为换了ZlmMediaKit流媒体服务依然出现这个问题,并且在客户端无法拉流后推流程序健康检测系统完全正常。

1、mediamtx日志

img

2、ZlmMediaKit日志

img

3、拉流、推流程序
主程序

import cv2
import signal
import sys
import threading
import time
from datetime import datetime
import queue
import subprocess
import os
import logging

# 导入配置文件
from stream_config import RTSP_URLS, STREAM_MAPPING, FFMPEG_CONFIG, WINDOW_CONFIG, LOG_CONFIG

# 配置日志
logging.basicConfig(
    level=getattr(logging, LOG_CONFIG['level']),
    format=LOG_CONFIG['format'],
    handlers=[
        logging.FileHandler(LOG_CONFIG['file']),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class FFmpegStreamer:
    """FFmpeg推流处理器"""
    def __init__(self, stream_name, input_url, output_url):
        self.stream_name = stream_name
        self.input_url = input_url
        self.output_url = output_url
        self.process = None
        self.running = False
        
    def start(self):
        """启动FFmpeg推流"""
        try:
            # FFmpeg命令参数 - 使用配置文件参数
            cmd = [
                'ffmpeg',
                '-i', self.input_url,  # 输入RTSP流
                '-c:v', FFMPEG_CONFIG['video_codec'],  # 视频编码器
                '-preset', FFMPEG_CONFIG['preset'],  # 编码速度预设
                '-tune', FFMPEG_CONFIG['tune'],  # 调优参数
                '-profile:v', FFMPEG_CONFIG['profile'],  # 编码配置
                '-pix_fmt', FFMPEG_CONFIG['pixel_format'],  # 像素格式
                '-c:a', FFMPEG_CONFIG['audio_codec'],  # 音频编码
                '-f', 'rtsp',  # 输出格式为RTSP
                '-rtsp_transport', FFMPEG_CONFIG['transport'],  # 传输协议
                '-muxdelay', FFMPEG_CONFIG['mux_delay'],  # 缓冲延迟
                self.output_url  # 输出RTSP地址
            ]
            
            print(f"[{self.stream_name}] 启动FFmpeg推流: {self.input_url} -> {self.output_url}")
            
            # 启动FFmpeg进程
            self.process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                stdin=subprocess.PIPE
            )
            
            self.running = True
            print(f"[{self.stream_name}] FFmpeg推流已启动")
            return True
            
        except Exception as e:
            print(f"[{self.stream_name}] FFmpeg推流启动失败: {str(e)}")
            return False
            
    def stop(self):
        """停止FFmpeg推流"""
        if self.process and self.running:
            try:
                self.process.terminate()
                self.process.wait(timeout=5)
                print(f"[{self.stream_name}] FFmpeg推流已停止")
            except subprocess.TimeoutExpired:
                self.process.kill()
                print(f"[{self.stream_name}] FFmpeg推流已强制停止")
            except Exception as e:
                print(f"[{self.stream_name}] 停止FFmpeg推流时出错: {str(e)}")
            finally:
                self.running = False
                self.process = None
                
    def is_running(self):
        """检查FFmpeg进程是否正在运行"""
        if self.process:
            return self.process.poll() is None
        return False
    
    def restart(self):
        """重启FFmpeg推流"""
        print(f"[{self.stream_name}] 正在重启FFmpeg推流...")
        self.stop()
        time.sleep(2)  # 等待2秒再重启
        return self.start()
    
    def monitor_health(self):
        """监控推流健康状态并自动重启"""
        retry_count = 0
        max_retries = FFMPEG_CONFIG['max_retries']
        retry_delay = FFMPEG_CONFIG['retry_delay']
        check_interval = FFMPEG_CONFIG['health_check_interval']
        
        logger.info(f"[{self.stream_name}] 健康监控已启动")
        
        while self.running:
            if not self.is_running():
                if retry_count < max_retries:
                    retry_count += 1
                    logger.warning(f"[{self.stream_name}] 推流进程异常退出,正在重启... (尝试 {retry_count}/{max_retries})")
                    if self.restart():
                        retry_count = 0  # 重启成功,重置计数器
                        logger.info(f"[{self.stream_name}] 推流重启成功")
                    else:
                        logger.error(f"[{self.stream_name}] 推流重启失败")
                        time.sleep(retry_delay)  # 失败后等待配置的时间
                else:
                    logger.error(f"[{self.stream_name}] 达到最大重试次数({max_retries}),停止监控")
                    break
            else:
                retry_count = 0  # 运行正常,重置计数器
            
            time.sleep(check_interval)  # 按配置间隔检查

class StreamHandler:
    """单个视频流处理器"""
    def __init__(self, stream_name, rtsp_url):
        self.stream_name = stream_name
        self.rtsp_url = rtsp_url
        self.cap = None
        self.running = True
        self.frame_queue = queue.Queue(maxsize=10)
        self.capture_thread = None
        
        # FPS计算相关
        self.fps_counter = 0
        self.fps_start_time = time.time()
        self.current_fps = 0
        
    def capture_frames(self):
        """在单独线程中捕获视频帧"""
        while self.running and self.cap and self.cap.isOpened():
            ret, frame = self.cap.read()
            if not ret:
                print(f"[{self.stream_name}] 视频流中断或结束")
                break
                
            # 如果队列满了,丢弃最旧的帧
            if self.frame_queue.full():
                try:
                    self.frame_queue.get_nowait()
                except queue.Empty:
                    pass
                    
            # 添加新帧到队列
            try:
                self.frame_queue.put(frame, timeout=0.1)
            except queue.Full:
                pass
                
    def draw_info_on_frame(self, frame):
        """在帧上绘制FPS和时间戳信息"""
        # 获取当前时间戳
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
        
        # 计算FPS
        self.fps_counter += 1
        current_time = time.time()
        if current_time - self.fps_start_time >= 1.0:
            self.current_fps = self.fps_counter / (current_time - self.fps_start_time)
            self.fps_counter = 0
            self.fps_start_time = current_time
            
        # 设置文本参数
        font = cv2.FONT_HERSHEY_SIMPLEX
        font_scale = 0.6
        color = (0, 255, 0)  # 绿色
        thickness = 2
        
        # 绘制半透明背景
        overlay = frame.copy()
        cv2.rectangle(overlay, (10, 10), (450, 100), (0, 0, 0), -1)
        frame = cv2.addWeighted(frame, 0.7, overlay, 0.3, 0)
        
        # 绘制流名称
        stream_text = f"Stream: {self.stream_name}"
        cv2.putText(frame, stream_text, (20, 35), font, font_scale, (255, 255, 0), thickness)
        
        # 绘制FPS信息
        fps_text = f"FPS: {self.current_fps:.1f}"
        cv2.putText(frame, fps_text, (20, 60), font, font_scale, color, thickness)
        
        # 绘制时间戳
        timestamp_text = f"Time: {timestamp}"
        cv2.putText(frame, timestamp_text, (20, 85), font, font_scale, color, thickness)
        
        return frame
        
    def get_latest_frame(self):
        """获取最新的处理后帧"""
        frame = None
        # 获取队列中最新的帧,丢弃旧帧
        while not self.frame_queue.empty():
            try:
                frame = self.frame_queue.get_nowait()
            except queue.Empty:
                break
                
        if frame is not None:
            return self.draw_info_on_frame(frame)
        return None
        
    def start(self):
        """启动视频流处理"""
        print(f"[{self.stream_name}] 正在连接RTSP流: {self.rtsp_url}")
        
        try:
            # 连接RTSP流
            self.cap = cv2.VideoCapture(self.rtsp_url)
            if not self.cap.isOpened():
                raise Exception(f"无法连接到RTSP流: {self.rtsp_url}")
                
            print(f"[{self.stream_name}] 连接成功,开始播放视频...")
            
            # 获取视频信息
            fps = self.cap.get(cv2.CAP_PROP_FPS)
            width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            
            print(f"[{self.stream_name}] 视频信息 - 分辨率: {width}x{height}, 原始帧率: {fps:.1f} FPS")
            
            # 启动捕获线程
            self.capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
            self.capture_thread.start()
            
            return True
            
        except Exception as e:
            print(f"[{self.stream_name}] 错误: {str(e)}")
            return False
            
    def stop(self):
        """停止视频流处理"""
        self.running = False
        
        # 等待线程结束
        if self.capture_thread and self.capture_thread.is_alive():
            self.capture_thread.join(timeout=2.0)
            
        if self.cap:
            self.cap.release()
            
        print(f"[{self.stream_name}] 资源已清理")

class MultiStreamViewer:
    def __init__(self):
        self.streams = {}
        self.ffmpeg_streamers = {}  # FFmpeg推流器
        self.running = True
        
        # 设置信号处理器来处理CTRL+C
        signal.signal(signal.SIGINT, self.signal_handler)
        
    def signal_handler(self, sig, frame):
        print("\n检测到CTRL+C,正在退出...")
        self.stop_all_streams()
        
    def start_ffmpeg_streamers(self):
        """启动所有FFmpeg推流器"""
        print("正在启动FFmpeg推流器...")
        
        for stream_name, input_url in RTSP_URLS.items():
            if stream_name in STREAM_MAPPING:
                output_url = STREAM_MAPPING[stream_name]
                ffmpeg_streamer = FFmpegStreamer(stream_name, input_url, output_url)
                
                if ffmpeg_streamer.start():
                    self.ffmpeg_streamers[stream_name] = ffmpeg_streamer
                    
                    # 启动健康监控线程
                    monitor_thread = threading.Thread(
                        target=ffmpeg_streamer.monitor_health, 
                        daemon=True,
                        name=f"Monitor-{stream_name}"
                    )
                    monitor_thread.start()
                    
                    print(f"[{stream_name}] FFmpeg推流启动成功,健康监控已启动")
                else:
                    print(f"[{stream_name}] FFmpeg推流启动失败")
                    
        print(f"成功启动 {len(self.ffmpeg_streamers)} 个FFmpeg推流器")
        
    def start_all_streams(self):
        """启动所有视频流"""
        print("正在启动多路RTSP视频流和FFmpeg推流...")
        print("推流映射:")
        for stream_name, output_url in STREAM_MAPPING.items():
            if stream_name in RTSP_URLS:
                print(f"  {stream_name}: {RTSP_URLS[stream_name]} -> {output_url}")
        print("按CTRL+C或'q'键退出程序")
        print()
        
        # 首先启动FFmpeg推流器
        self.start_ffmpeg_streamers()
        
        # 等待一下让FFmpeg推流器稳定
        time.sleep(2)
        
        # 创建并启动所有流处理器
        for stream_name, rtsp_url in RTSP_URLS.items():
            stream_handler = StreamHandler(stream_name, rtsp_url)
            if stream_handler.start():
                self.streams[stream_name] = stream_handler
            else:
                print(f"[{stream_name}] 启动失败")
                
        if not self.streams:
            print("没有成功启动的视频流,程序退出")
            return
            
        print(f"成功启动 {len(self.streams)} 路视频流")
        
        # 在主线程中创建窗口并显示视频
        windows = {}
        for stream_name in self.streams.keys():
            window_name = f'RTSP视频流 - {stream_name}'
            cv2.namedWindow(window_name, cv2.WINDOW_AUTOSIZE)
            
            # 设置窗口位置(使用配置文件)
            if stream_name in WINDOW_CONFIG:
                pos = WINDOW_CONFIG[stream_name]
                cv2.moveWindow(window_name, pos['x'], pos['y'])
            else:
                cv2.moveWindow(window_name, 50, 50)  # 默认位置
                
            windows[stream_name] = window_name
        
        # 主循环,在主线程中显示所有视频流
        try:
            while self.running and any(stream.running for stream in self.streams.values()):
                # 更新所有窗口的帧
                for stream_name, stream_handler in self.streams.items():
                    if stream_handler.running:
                        frame = stream_handler.get_latest_frame()
                        if frame is not None:
                            cv2.imshow(windows[stream_name], frame)
                            
                        # 检查窗口是否被关闭
                        if cv2.getWindowProperty(windows[stream_name], cv2.WND_PROP_VISIBLE) < 1:
                            stream_handler.running = False
                
                # FFmpeg推流器状态由独立的健康监控线程处理
                
                # 检查按键
                key = cv2.waitKey(30) & 0xFF
                if key == ord('q'):
                    break
                    
                # 检查是否所有流都已停止
                if not any(stream.running for stream in self.streams.values()):
                    print("所有视频流已停止")
                    break
                    
        except KeyboardInterrupt:
            print("\n检测到键盘中断")
        finally:
            self.stop_all_streams()
            
    def stop_all_streams(self):
        """停止所有视频流和FFmpeg推流器"""
        self.running = False
        print("正在停止所有视频流和FFmpeg推流器...")
        
        # 停止所有视频流处理器
        for stream_name, stream_handler in self.streams.items():
            stream_handler.stop()
            
        # 停止所有FFmpeg推流器
        for stream_name, ffmpeg_streamer in self.ffmpeg_streamers.items():
            ffmpeg_streamer.stop()
            
        cv2.destroyAllWindows()
        print("所有资源已清理,程序退出")
        sys.exit(0)

def main():
    viewer = MultiStreamViewer()
    viewer.start_all_streams()

if __name__ == "__main__":
    main()

配置文件

# 流媒体配置文件
# Stream Configuration File

# RTSP源地址配置
RTSP_URLS = {
    "video1": "rtsp://192.168.144.25:8554/video1",
    # "video2": "rtsp://192.168.144.25:8554/video2"
}

# 推流目标地址映射
STREAM_MAPPING = {
    "video1": "rtsp://127.0.0.1:8554/stream/main",
    # "video2": "rtsp://127.0.0.1:8554/stream/thermal"
}

# FFmpeg编码配置
FFMPEG_CONFIG = {
    # 视频编码设置
    'video_codec': 'libx264',        # H.264编码器
    'preset': 'ultrafast',           # 编码速度预设
    'tune': 'zerolatency',          # 零延迟调优
    'profile': 'baseline',           # 基线配置,最大兼容性
    'pixel_format': 'yuv420p',       # 标准像素格式
    
    # 音频编码设置
    'audio_codec': 'copy',           # 音频直接复制
    
    # 传输设置
    'transport': 'tcp',              # 强制使用TCP传输
    'mux_delay': '0.1',             # 减少缓冲延迟
    
    # 重试设置
    'max_retries': 3,               # 最大重试次数
    'retry_delay': 5,               # 重试间隔(秒)
    'health_check_interval': 5,     # 健康检查间隔(秒)
    'heartbeat_drop': 60,           # 心跳丢失阈值
}

# 显示窗口配置
WINDOW_CONFIG = {
    'video1': {'x': 50, 'y': 50},
    'video2': {'x': 650, 'y': 50}
}

# 日志配置
LOG_CONFIG = {
    'level': 'INFO',
    'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    'file': 'stream_monitor.log'
}

  • 写回答

5条回答 默认 最新

  • 紫雾凌寒 新星创作者: 人工智能技术领域 2025-07-14 10:36
    关注

    让 “紫雾凌寒” 来帮你回答,本回答结合 DeepSeek

    如果还有问题,请评论或点击头像私信,如果有帮助到您, 请关注、采纳、点赞

    1. 关键点分析:
      - 推流在6 - 7分钟左右规律性中断。
      - 已排除拉流推流代码以及mediamtx流媒体服务本身的问题。
    2. 解决方案:
      - 网络方面
      • 检查网络连接稳定性。可以使用ping命令持续监测网络状态,例如:
        bash ping -t <流媒体服务器IP>
      • 查看网络带宽使用情况,确保推流过程中有足够的带宽。可以使用iftop等工具监控网络流量,如:
        bash sudo apt-get install iftop iftop
      • 检查网络配置,比如是否存在NAT设备导致的网络问题。如果是通过NAT设备,查看NAT设备的会话超时设置等参数。
      • ffmpeg参数及资源方面
      • 检查ffmpeg推流时的缓冲区设置。例如,增加输出缓冲区大小,在推流命令中可以添加参数:
        bash -bufsize <size> -maxrate <rate>
        其中<size>是缓冲区大小,<rate>是最大码率。比如:
        bash ffmpeg -i <input_rtsp_url> -c:v copy -c:a aac -bufsize 1024k -maxrate 512k -f rtsp <output_rtsp_url>
      • 查看系统资源使用情况,特别是磁盘I/O。如果磁盘I/O繁忙,可能会影响推流。可以使用iostat命令查看磁盘I/O统计信息:
        bash sudo apt-get install sysstat iostat -x 1
      • 检查是否有其他进程占用了大量系统资源,导致推流中断。可以使用top命令查看系统进程资源占用情况:
        bash top
      • 流媒体服务器配置方面
      • 查看mediamtx的日志文件,获取更详细的错误信息。通常日志文件路径在其配置文件中指定,比如:
        bash tail -f <mediamtx_log_file_path>
      • 检查mediamtx的配置文件,看是否有与推流时长相关的限制配置。例如,检查是否存在连接超时、会话超时等参数设置,确保这些参数设置足够大以适应推流需求。
    3. 不同方案优缺点:
      - 网络方面
      • 优点:从网络层面全面排查,较为基础且全面,能发现很多潜在的网络相关问题。
      • 缺点:排查过程可能较为繁琐,需要一定的网络知识,且可能需要一些工具的安装和配置。
      • ffmpeg参数及资源方面
      • 优点:针对性较强,直接对ffmpeg推流过程进行优化,可能快速解决因参数设置不合理导致的问题。
      • 缺点:需要对ffmpeg参数有一定了解,可能需要多次尝试不同参数组合来找到最优解。
      • 流媒体服务器配置方面
      • 优点:直接针对流媒体服务器进行检查,能找到服务器配置层面可能导致问题的原因。
      • 缺点:需要熟悉流媒体服务器的配置和日志分析,可能需要查阅相关文档来理解日志信息和配置参数。
    4. 总结:
      - 推流出现规律性中断是一个比较复杂的问题,需要从网络、ffmpeg参数及资源、流媒体服务器配置等多个方面进行排查。通过逐步检查上述各个方面,可以找到导致推流中断的原因并解决问题。在排查过程中,要充分利用各种工具获取详细信息,以便更准确地定位问题所在。

    希望以上解答对您有所帮助。如果您有任何疑问,欢迎在评论区提出。

    评论

报告相同问题?

问题事件

  • 创建了问题 7月14日