我目前在ubuntu上python使用ffmpeg拉流并推流,并使用mediamtx作为流媒体服务,但是出现推流6分钟左右规律性断流,从py程序来看并不是拉流推流的问题,也不是mediamtx流媒体服务问题。因为换了ZlmMediaKit流媒体服务依然出现这个问题,并且在客户端无法拉流后推流程序健康检测系统完全正常。
1、mediamtx日志

2、ZlmMediaKit日志

3、拉流、推流程序
主程序
import cv2
import signal
import sys
import threading
import time
from datetime import datetime
import queue
import subprocess
import os
import logging
# 导入配置文件
from stream_config import RTSP_URLS, STREAM_MAPPING, FFMPEG_CONFIG, WINDOW_CONFIG, LOG_CONFIG
# 配置日志
logging.basicConfig(
level=getattr(logging, LOG_CONFIG['level']),
format=LOG_CONFIG['format'],
handlers=[
logging.FileHandler(LOG_CONFIG['file']),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class FFmpegStreamer:
"""FFmpeg推流处理器"""
def __init__(self, stream_name, input_url, output_url):
self.stream_name = stream_name
self.input_url = input_url
self.output_url = output_url
self.process = None
self.running = False
def start(self):
"""启动FFmpeg推流"""
try:
# FFmpeg命令参数 - 使用配置文件参数
cmd = [
'ffmpeg',
'-i', self.input_url, # 输入RTSP流
'-c:v', FFMPEG_CONFIG['video_codec'], # 视频编码器
'-preset', FFMPEG_CONFIG['preset'], # 编码速度预设
'-tune', FFMPEG_CONFIG['tune'], # 调优参数
'-profile:v', FFMPEG_CONFIG['profile'], # 编码配置
'-pix_fmt', FFMPEG_CONFIG['pixel_format'], # 像素格式
'-c:a', FFMPEG_CONFIG['audio_codec'], # 音频编码
'-f', 'rtsp', # 输出格式为RTSP
'-rtsp_transport', FFMPEG_CONFIG['transport'], # 传输协议
'-muxdelay', FFMPEG_CONFIG['mux_delay'], # 缓冲延迟
self.output_url # 输出RTSP地址
]
print(f"[{self.stream_name}] 启动FFmpeg推流: {self.input_url} -> {self.output_url}")
# 启动FFmpeg进程
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE
)
self.running = True
print(f"[{self.stream_name}] FFmpeg推流已启动")
return True
except Exception as e:
print(f"[{self.stream_name}] FFmpeg推流启动失败: {str(e)}")
return False
def stop(self):
"""停止FFmpeg推流"""
if self.process and self.running:
try:
self.process.terminate()
self.process.wait(timeout=5)
print(f"[{self.stream_name}] FFmpeg推流已停止")
except subprocess.TimeoutExpired:
self.process.kill()
print(f"[{self.stream_name}] FFmpeg推流已强制停止")
except Exception as e:
print(f"[{self.stream_name}] 停止FFmpeg推流时出错: {str(e)}")
finally:
self.running = False
self.process = None
def is_running(self):
"""检查FFmpeg进程是否正在运行"""
if self.process:
return self.process.poll() is None
return False
def restart(self):
"""重启FFmpeg推流"""
print(f"[{self.stream_name}] 正在重启FFmpeg推流...")
self.stop()
time.sleep(2) # 等待2秒再重启
return self.start()
def monitor_health(self):
"""监控推流健康状态并自动重启"""
retry_count = 0
max_retries = FFMPEG_CONFIG['max_retries']
retry_delay = FFMPEG_CONFIG['retry_delay']
check_interval = FFMPEG_CONFIG['health_check_interval']
logger.info(f"[{self.stream_name}] 健康监控已启动")
while self.running:
if not self.is_running():
if retry_count < max_retries:
retry_count += 1
logger.warning(f"[{self.stream_name}] 推流进程异常退出,正在重启... (尝试 {retry_count}/{max_retries})")
if self.restart():
retry_count = 0 # 重启成功,重置计数器
logger.info(f"[{self.stream_name}] 推流重启成功")
else:
logger.error(f"[{self.stream_name}] 推流重启失败")
time.sleep(retry_delay) # 失败后等待配置的时间
else:
logger.error(f"[{self.stream_name}] 达到最大重试次数({max_retries}),停止监控")
break
else:
retry_count = 0 # 运行正常,重置计数器
time.sleep(check_interval) # 按配置间隔检查
class StreamHandler:
"""单个视频流处理器"""
def __init__(self, stream_name, rtsp_url):
self.stream_name = stream_name
self.rtsp_url = rtsp_url
self.cap = None
self.running = True
self.frame_queue = queue.Queue(maxsize=10)
self.capture_thread = None
# FPS计算相关
self.fps_counter = 0
self.fps_start_time = time.time()
self.current_fps = 0
def capture_frames(self):
"""在单独线程中捕获视频帧"""
while self.running and self.cap and self.cap.isOpened():
ret, frame = self.cap.read()
if not ret:
print(f"[{self.stream_name}] 视频流中断或结束")
break
# 如果队列满了,丢弃最旧的帧
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except queue.Empty:
pass
# 添加新帧到队列
try:
self.frame_queue.put(frame, timeout=0.1)
except queue.Full:
pass
def draw_info_on_frame(self, frame):
"""在帧上绘制FPS和时间戳信息"""
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# 计算FPS
self.fps_counter += 1
current_time = time.time()
if current_time - self.fps_start_time >= 1.0:
self.current_fps = self.fps_counter / (current_time - self.fps_start_time)
self.fps_counter = 0
self.fps_start_time = current_time
# 设置文本参数
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
color = (0, 255, 0) # 绿色
thickness = 2
# 绘制半透明背景
overlay = frame.copy()
cv2.rectangle(overlay, (10, 10), (450, 100), (0, 0, 0), -1)
frame = cv2.addWeighted(frame, 0.7, overlay, 0.3, 0)
# 绘制流名称
stream_text = f"Stream: {self.stream_name}"
cv2.putText(frame, stream_text, (20, 35), font, font_scale, (255, 255, 0), thickness)
# 绘制FPS信息
fps_text = f"FPS: {self.current_fps:.1f}"
cv2.putText(frame, fps_text, (20, 60), font, font_scale, color, thickness)
# 绘制时间戳
timestamp_text = f"Time: {timestamp}"
cv2.putText(frame, timestamp_text, (20, 85), font, font_scale, color, thickness)
return frame
def get_latest_frame(self):
"""获取最新的处理后帧"""
frame = None
# 获取队列中最新的帧,丢弃旧帧
while not self.frame_queue.empty():
try:
frame = self.frame_queue.get_nowait()
except queue.Empty:
break
if frame is not None:
return self.draw_info_on_frame(frame)
return None
def start(self):
"""启动视频流处理"""
print(f"[{self.stream_name}] 正在连接RTSP流: {self.rtsp_url}")
try:
# 连接RTSP流
self.cap = cv2.VideoCapture(self.rtsp_url)
if not self.cap.isOpened():
raise Exception(f"无法连接到RTSP流: {self.rtsp_url}")
print(f"[{self.stream_name}] 连接成功,开始播放视频...")
# 获取视频信息
fps = self.cap.get(cv2.CAP_PROP_FPS)
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"[{self.stream_name}] 视频信息 - 分辨率: {width}x{height}, 原始帧率: {fps:.1f} FPS")
# 启动捕获线程
self.capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
self.capture_thread.start()
return True
except Exception as e:
print(f"[{self.stream_name}] 错误: {str(e)}")
return False
def stop(self):
"""停止视频流处理"""
self.running = False
# 等待线程结束
if self.capture_thread and self.capture_thread.is_alive():
self.capture_thread.join(timeout=2.0)
if self.cap:
self.cap.release()
print(f"[{self.stream_name}] 资源已清理")
class MultiStreamViewer:
def __init__(self):
self.streams = {}
self.ffmpeg_streamers = {} # FFmpeg推流器
self.running = True
# 设置信号处理器来处理CTRL+C
signal.signal(signal.SIGINT, self.signal_handler)
def signal_handler(self, sig, frame):
print("\n检测到CTRL+C,正在退出...")
self.stop_all_streams()
def start_ffmpeg_streamers(self):
"""启动所有FFmpeg推流器"""
print("正在启动FFmpeg推流器...")
for stream_name, input_url in RTSP_URLS.items():
if stream_name in STREAM_MAPPING:
output_url = STREAM_MAPPING[stream_name]
ffmpeg_streamer = FFmpegStreamer(stream_name, input_url, output_url)
if ffmpeg_streamer.start():
self.ffmpeg_streamers[stream_name] = ffmpeg_streamer
# 启动健康监控线程
monitor_thread = threading.Thread(
target=ffmpeg_streamer.monitor_health,
daemon=True,
name=f"Monitor-{stream_name}"
)
monitor_thread.start()
print(f"[{stream_name}] FFmpeg推流启动成功,健康监控已启动")
else:
print(f"[{stream_name}] FFmpeg推流启动失败")
print(f"成功启动 {len(self.ffmpeg_streamers)} 个FFmpeg推流器")
def start_all_streams(self):
"""启动所有视频流"""
print("正在启动多路RTSP视频流和FFmpeg推流...")
print("推流映射:")
for stream_name, output_url in STREAM_MAPPING.items():
if stream_name in RTSP_URLS:
print(f" {stream_name}: {RTSP_URLS[stream_name]} -> {output_url}")
print("按CTRL+C或'q'键退出程序")
print()
# 首先启动FFmpeg推流器
self.start_ffmpeg_streamers()
# 等待一下让FFmpeg推流器稳定
time.sleep(2)
# 创建并启动所有流处理器
for stream_name, rtsp_url in RTSP_URLS.items():
stream_handler = StreamHandler(stream_name, rtsp_url)
if stream_handler.start():
self.streams[stream_name] = stream_handler
else:
print(f"[{stream_name}] 启动失败")
if not self.streams:
print("没有成功启动的视频流,程序退出")
return
print(f"成功启动 {len(self.streams)} 路视频流")
# 在主线程中创建窗口并显示视频
windows = {}
for stream_name in self.streams.keys():
window_name = f'RTSP视频流 - {stream_name}'
cv2.namedWindow(window_name, cv2.WINDOW_AUTOSIZE)
# 设置窗口位置(使用配置文件)
if stream_name in WINDOW_CONFIG:
pos = WINDOW_CONFIG[stream_name]
cv2.moveWindow(window_name, pos['x'], pos['y'])
else:
cv2.moveWindow(window_name, 50, 50) # 默认位置
windows[stream_name] = window_name
# 主循环,在主线程中显示所有视频流
try:
while self.running and any(stream.running for stream in self.streams.values()):
# 更新所有窗口的帧
for stream_name, stream_handler in self.streams.items():
if stream_handler.running:
frame = stream_handler.get_latest_frame()
if frame is not None:
cv2.imshow(windows[stream_name], frame)
# 检查窗口是否被关闭
if cv2.getWindowProperty(windows[stream_name], cv2.WND_PROP_VISIBLE) < 1:
stream_handler.running = False
# FFmpeg推流器状态由独立的健康监控线程处理
# 检查按键
key = cv2.waitKey(30) & 0xFF
if key == ord('q'):
break
# 检查是否所有流都已停止
if not any(stream.running for stream in self.streams.values()):
print("所有视频流已停止")
break
except KeyboardInterrupt:
print("\n检测到键盘中断")
finally:
self.stop_all_streams()
def stop_all_streams(self):
"""停止所有视频流和FFmpeg推流器"""
self.running = False
print("正在停止所有视频流和FFmpeg推流器...")
# 停止所有视频流处理器
for stream_name, stream_handler in self.streams.items():
stream_handler.stop()
# 停止所有FFmpeg推流器
for stream_name, ffmpeg_streamer in self.ffmpeg_streamers.items():
ffmpeg_streamer.stop()
cv2.destroyAllWindows()
print("所有资源已清理,程序退出")
sys.exit(0)
def main():
viewer = MultiStreamViewer()
viewer.start_all_streams()
if __name__ == "__main__":
main()
配置文件
# 流媒体配置文件
# Stream Configuration File
# RTSP源地址配置
RTSP_URLS = {
"video1": "rtsp://192.168.144.25:8554/video1",
# "video2": "rtsp://192.168.144.25:8554/video2"
}
# 推流目标地址映射
STREAM_MAPPING = {
"video1": "rtsp://127.0.0.1:8554/stream/main",
# "video2": "rtsp://127.0.0.1:8554/stream/thermal"
}
# FFmpeg编码配置
FFMPEG_CONFIG = {
# 视频编码设置
'video_codec': 'libx264', # H.264编码器
'preset': 'ultrafast', # 编码速度预设
'tune': 'zerolatency', # 零延迟调优
'profile': 'baseline', # 基线配置,最大兼容性
'pixel_format': 'yuv420p', # 标准像素格式
# 音频编码设置
'audio_codec': 'copy', # 音频直接复制
# 传输设置
'transport': 'tcp', # 强制使用TCP传输
'mux_delay': '0.1', # 减少缓冲延迟
# 重试设置
'max_retries': 3, # 最大重试次数
'retry_delay': 5, # 重试间隔(秒)
'health_check_interval': 5, # 健康检查间隔(秒)
'heartbeat_drop': 60, # 心跳丢失阈值
}
# 显示窗口配置
WINDOW_CONFIG = {
'video1': {'x': 50, 'y': 50},
'video2': {'x': 650, 'y': 50}
}
# 日志配置
LOG_CONFIG = {
'level': 'INFO',
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
'file': 'stream_monitor.log'
}