python + ffmpeg视频并发直播压力测试

  

通过python与ffmpeg结合使用,可生成进行视频点播,直播的压力测试脚本。可支持不同类型的视频流,比如rtmp或者hls形式。
  通过如下方式执行脚本:python multiRealPlay。py (rtmp | http][线程计数](时间间隔)
  [rtmp | http]:视频播放的不同形式
  (线程计数):并发线程数
  (时间间隔):启动每个线程的间隔时间

  

代码:         # !/usr/bin/python   # - * -编码:utf - 8 - *   “‘   创建于2015年7月22日      @author: LiBiao   “‘   进口日期时间,时间   进口线程   导入子流程   进口操作系统,base64   导入系统   进口队列      队列=Queue.Queue ()         #需要手动配置的参数      #启动序号   SLEEP_TIME=0      #直播地址   FULL_ADDR={}      #需要手动配置的参数   RTMP_ADDR=' rtmp://192.168.1.208:1935/生活/HTTP_ADDR=' http://192.168.1.208:80生活'   liveID=' 100002750 ' #来自于万视无忧中创建的直播   urlKey=' a1e5c680f7bfc85851de8ab2e63b0a33 ' #来自于万视无忧安全设置模块   liveResCode=71 ac6c06d3 #直播源码         #生成MD5值   def getMD5_Value (inputdata):   试一试:   进口hashlib   散列=hashlib.md5 (inputdata.encode (utf - 8))   除了ImportError:   #为python & lt; & lt;2.5   进口md5   散列=md5.new ()      返回hash.hexdigest ()         #直播地址组装   def build_live_addr ():   t=time.strftime (' % Y % m % d % H % m % S的,time.localtime ()) [2]   data=' https://www.yisu.com/zixun/%s # % # %年代' % (liveID t urlKey)   秘密=getMD5_Value(数据)   rtmp_addr=' % s % & # 63; liveID=% s&时间=% s&秘密=% s ' % (rtmp_addr, liveResCode liveID t,秘密)   http_addr=' % s/% s/playlist.m3u8& # 63; liveID=% s&时间=% s&秘密=% s ' % (http_addr, liveResCode liveID t,秘密)   FULL_ADDR [' rtmp ']=rtmp_addr   FULL_ADDR=http_addr“http”   返回FULL_ADDR      #获取本机ip地址,用来产生区别于其他机器的数据   def get_local_ip ():   试一试:   ip=操作系统。popen (“ifconfig | grep的inet addr | awk{打印$ 2}”).read ()   ip=ip (ip.find (“:”) + 1: ip.find (“\ n”)]   除了例外,e:   打印e   返回的ip         类Video_To_Live (threading.Thread):   def __init__(自我、队列):   threading.Thread.__init__(自我)   自我。队列=队列      def运行(自我):   liveAddr=self.queue.get ()   #打印liveAddr   试一试:   打印liveAddr   subprocess.call ('。/ffmpeg - i \“% s \”- c: v - c:复制一个副本净水器:4000/dev/null aac_adtstoasc - y - f flv超时2在/dev/null的% liveAddr, stdout=subprocess.PIPE shell=True)   除了例外e:   wiriteLog(‘错误’,str (e))   self.queue.task_done ()         if __name__==癬_main__”:   time . sleep (SLEEP_TIME)   解析器=argparse。ArgumentParser(描述=吧嬗蜗贰?   解析器。add_argument (——liveType, action=吧痰辍?dest=發iveType”,要求=False)   解析器。add_argument (——pnum, action=吧痰辍?dest=皃num”=int类型,需要=False)   解析器。add_argument (——itime, action=吧痰辍?dest=癷time”,要求=False)   given_args=parser.parse_args ()      liveType=given_args.liveType   threadNum=given_args.pnum   intervalTime=given_args.itime      打印“% d个% s进程开始运行........”% (threadNum Video_To_Live)   因为我在xrange (threadNum):   videotolive=Video_To_Live(队列)   videotolive.setDaemon(真正的)   videotolive.start ()      因为我在xrange (threadNum):   如果liveType (“http”、“rtmp”):   addr=build_live_addr ()   liveaddr=addr [liveType]   queue.put (liveaddr)   time . sleep (intervalTime)   queue.join ()   打印”进程退出”   之前      

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

python + ffmpeg视频并发直播压力测试