有55人阅读过
群晖套件iptv自动测试/删除失效源(只能测V4,仅供参考)
发布于2024/01/16 更新于2024/01/17
[ 教程仅保证更新时有效,请自行测试。]
发布于2024/01/16 更新于2024/01/17
[ 教程仅保证更新时有效,请自行测试。]
[ 教程仅保证更新时有效,请自行测试。]
下载压缩文件,上传到和iptv同一个机器上,使用 控制面板-计划任务 定时执行。
# 此脚本功能是链接iptv的数据库,自动测试直播源有效性,并删除无效直播源,可添加在群晖的计划任务中定时执行。
# 此脚本检测结果可能存在部分误报,请谨慎使用。
# 此脚本依赖python和FFmpeg4,请手动在套件中心安装,如使用其他版本FFmpeg,请在下文中修改ffprobe的位置。
# 此脚本需要大量的库,请手动运行一次,根据提示,安装所需的依赖库
# 此脚本需要在安装iptv套件的本地运行,请在同一台机器上执行。
代码如下:
import csv import sys import json import os import re import traceback import requests import subprocess import time import shutil import pymysql import pandas as pd from ffmpy import FFprobe from subprocess import PIPE from sys import stdout from termcolor import colored, RESET from datetime import datetime from func_timeout import func_set_timeout, FunctionTimedOut from requests.adapters import HTTPAdapter dt = datetime.now() uniqueList = [] # 此脚本功能是链接iptv的数据库,自动测试直播源有效性,并删除无效直播源,可添加在群晖的计划任务中定时执行。 # 此脚本检测结果可能存在部分误报,请谨慎使用。 # 此脚本依赖python和FFmpeg4,请手动在套件中心安装,如使用其他版本FFmpeg,请在下文中修改ffprobe的位置。 # 此脚本需要大量的库,请手动运行一次,根据提示,安装所需的依赖库 # 此脚本需要在安装iptv套件的本地运行,请在同一台机器上执行。 # 替换以下信息为你的数据库连接参数 host = 'localhost' user = 'iptv_user' password = 'xxxxxxxx' database = 'iptv' # 提取数据库中的频道信息到csv文件 def outcsv(): conn = pymysql.connect(host=host, user=user, password=password, database=database) cursor = conn.cursor() query = "SELECT name, category, id, url FROM iptv_channels" cursor.execute(query) result = cursor.fetchall() conn.close() columns = ['Channel', 'Group', 'Source', 'Link'] df = pd.DataFrame(result, columns=columns) df.to_csv('data.csv', index=False, encoding='utf-8-sig') # 在此处修改超时设置 @func_set_timeout(5) def get_stream(num, clist, uri): try: # 默认ffprobe位置为ffmpeg4的位置,如果安装了其他版本,也可以修改此处的文件地址为其他版本的。 ffprobe = FFprobe(executable='/var/packages/ffmpeg/target/bin/ffprobe', inputs={uri: '-v error -show_format -show_streams -print_format json'}) cdata = json.loads(ffprobe.run( stdout=PIPE, stderr=PIPE)[0].decode('utf-8')) return cdata except Exception as e: print('[{}] {}({}) Error:{}'.format( str(num), clist[0], clist[2], str(e))) return False def check_channel(clist, num): uri = clist[3] requests.adapters.DEFAULT_RETRIES = 3 try: r = requests.get(clist[3], timeout=3) if r.status_code == requests.codes.ok: cdata = get_stream(num, clist, uri) if cdata: flagAudio = 0 flagVideo = 0 flagHDR = 0 flagHEVC = 0 vwidth = 0 vheight = 0 for i in cdata['streams']: if i['codec_type'] == 'video': flagVideo = 1 if 'color_space' in i: if 'bt2020' in i['color_space']: flagHDR = 1 if i['codec_name'] == 'hevc': flagHEVC = 1 if vwidth <= i['coded_width']: vwidth = i['coded_width'] vheight = i['coded_height'] elif i['codec_type'] == 'audio': flagAudio = 1 if flagAudio == 0: print('[{}] {}({}) Error: Video Only!'.format( str(num), clist[0], clist[2])) return False if flagVideo == 0: print('[{}] {}({}) Error: Audio Only!'.format( str(num), clist[0], clist[2])) return False if (vwidth == 0) or (vheight == 0): print('[{}] {}({}) Error: {}x{}'.format( str(num), clist[0], clist[2], vwidth, vheight)) if flagHDR == 0: print('[{}] {}({}) PASS: {}*{}'.format(str(num), clist[0], clist[2], vwidth, vheight)) return [vwidth, vheight, ''] if flagHDR == 1: print('[{}] {}({}) PASS(HDR Enabled): {}*{}'.format(str(num), clist[0], clist[2], vwidth, vheight)) return [vwidth, vheight, 'HDR'] if flagHEVC == 1: print('[{}] {}({}) PASS(HEVC Enabled): {}*{}'.format(str(num), clist[0], clist[2], vwidth, vheight)) return [vwidth, vheight, 'HEVC'] else: return False else: print('[{}] {}({}) Error:{}'.format( str(num), clist[0], clist[2], str(r.status_code))) return False except Exception as e: print('[{}] {}({}) Error:{}'.format( str(num), clist[0], clist[2], str(e))) return False def print_info(): print('Time: {}-{}-{} {}:{}'.format(dt.year, dt.month, dt.day, dt.hour, dt.minute)) def rm_files(target, selection): if selection == 1: try: shutil.rmtree(target) except OSError: pass try: os.mkdir(target) except OSError: pass else: try: os.remove(target) except OSError: pass def getdes(st): if st: return '[{}]'.format(st) else: return '' # 导出测试失败列表 def main(): print_info() Total = 0 Total_failed = 0 fulltimes = '-{}{}{}{}{}'.format(dt.year, dt.month, dt.day, dt.hour, dt.minute) times = '' rm_files('failed.txt', 2) with open('data.csv') as f: f_csv = csv.reader(f) headers = next(f_csv) num = 1 failed_channels = [] for row in f_csv: try: if row[3] in uniqueList: ret = False else: uniqueList.append(row[3]) ret = check_channel(row, num) except FunctionTimedOut as e: print('[{}] {}({}) Error:{}'.format( str(num), row[0], row[2], str(e))) ret = False if ret: Total += 1 else: failed_channels.append(row[3]) Total_failed += 1 num += 1 time.sleep(0.25) with open('failed.txt', 'w') as failed_file: for failed_channel in failed_channels: failed_file.write(f'{failed_channel}\n') print('有效数量: {}'.format(Total)) print('无效数量: {}'.format(Total_failed)) # 删除测试失败链接 def delchannel(): conn = pymysql.connect(host=host, user=user, password=password, database=database) cursor = conn.cursor() # 从failed.txt文件中读取URLs with open('failed.txt', 'r') as file: urls = file.read().splitlines() # 删除对应的记录 for url in urls: try: # 使用参数化查询防止SQL注入 delete_query = "DELETE FROM iptv_channels WHERE url = %s" cursor.execute(delete_query, (url,)) conn.commit() print(f"'{url}' 已从数据库删除.") except Exception as e: print(f"删除失败: '{url}': {e}") conn.rollback() # 关闭数据库连接 cursor.close() conn.close() # 删除data.csv文件 csv_file_path = 'data.csv' try: os.remove(csv_file_path) print(f"临时文件:{csv_file_path} 已清除。") except FileNotFoundError: print(f"{csv_file_path} 文件不存在,无需删除。") except Exception as e: print(f"删除 {csv_file_path} 文件时出错:{e}") if __name__ == '__main__': outcsv(); main(); delchannel();
文章对你有帮助吗?
- 一般[0]
- 很赞[0]
- 没用[0]
- 垃圾[0]
- 无语[0]