通过python与ffmpeg结合使用,可生成进行视频点播,直播的压力测试脚本。可支持不同类型的视频流,比如rtmp或者hls形式。
通过如下方式执行脚本:python multiRealPlay。py (rtmp | http][线程计数](时间间隔)
[rtmp | http]:视频播放的不同形式
(线程计数):并发线程数
(时间间隔):启动每个线程的间隔时间
代码:
# !/usr/bin/python # - * -编码:utf - 8 - * “‘ 创建于2015年7月22日 @author: LiBiao “‘ 进口日期时间,时间 进口线程 导入子流程 进口操作系统,base64 导入系统 进口队列 队列=Queue.Queue () #需要手动配置的参数 #启动序号 SLEEP_TIME=0 #直播地址 FULL_ADDR={} #需要手动配置的参数 RTMP_ADDR=' rtmp://192.168.1.208:1935/生活/HTTP_ADDR=' http://192.168.1.208:80生活' liveID=' 100002750 ' #来自于万视无忧中创建的直播 urlKey=' a1e5c680f7bfc85851de8ab2e63b0a33 ' #来自于万视无忧安全设置模块 liveResCode=71 ac6c06d3 #直播源码 #生成MD5值 def getMD5_Value (inputdata): 试一试: 进口hashlib 散列=hashlib.md5 (inputdata.encode (utf - 8)) 除了ImportError: #为python & lt; & lt;2.5 进口md5 散列=md5.new () 返回hash.hexdigest () #直播地址组装 def build_live_addr (): t=time.strftime (' % Y % m % d % H % m % S的,time.localtime ()) [2] data=' https://www.yisu.com/zixun/%s # % # %年代' % (liveID t urlKey) 秘密=getMD5_Value(数据) rtmp_addr=' % s % & # 63; liveID=% s&时间=% s&秘密=% s ' % (rtmp_addr, liveResCode liveID t,秘密) http_addr=' % s/% s/playlist.m3u8& # 63; liveID=% s&时间=% s&秘密=% s ' % (http_addr, liveResCode liveID t,秘密) FULL_ADDR [' rtmp ']=rtmp_addr FULL_ADDR=http_addr“http” 返回FULL_ADDR #获取本机ip地址,用来产生区别于其他机器的数据 def get_local_ip (): 试一试: ip=操作系统。popen (“ifconfig | grep的inet addr | awk{打印$ 2}”).read () ip=ip (ip.find (“:”) + 1: ip.find (“\ n”)] 除了例外,e: 打印e 返回的ip 类Video_To_Live (threading.Thread): def __init__(自我、队列): threading.Thread.__init__(自我) 自我。队列=队列 def运行(自我): liveAddr=self.queue.get () #打印liveAddr 试一试: 打印liveAddr subprocess.call ('。/ffmpeg - i \“% s \”- c: v - c:复制一个副本净水器:4000/dev/null aac_adtstoasc - y - f flv超时2在/dev/null的% liveAddr, stdout=subprocess.PIPE shell=True) 除了例外e: wiriteLog(‘错误’,str (e)) self.queue.task_done () if __name__==癬_main__”: time . sleep (SLEEP_TIME) 解析器=argparse。ArgumentParser(描述=吧嬗蜗贰? 解析器。add_argument (——liveType, action=吧痰辍?dest=發iveType”,要求=False) 解析器。add_argument (——pnum, action=吧痰辍?dest=皃num”=int类型,需要=False) 解析器。add_argument (——itime, action=吧痰辍?dest=癷time”,要求=False) given_args=parser.parse_args () liveType=given_args.liveType threadNum=given_args.pnum intervalTime=given_args.itime 打印“% d个% s进程开始运行........”% (threadNum Video_To_Live) 因为我在xrange (threadNum): videotolive=Video_To_Live(队列) videotolive.setDaemon(真正的) videotolive.start () 因为我在xrange (threadNum): 如果liveType (“http”、“rtmp”): addr=build_live_addr () liveaddr=addr [liveType] queue.put (liveaddr) time . sleep (intervalTime) queue.join () 打印”进程退出” >之前以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
python + ffmpeg视频并发直播压力测试