基于水槽+卡夫卡+ Spark-Streaming的实时流式处理过程是怎样的

  介绍

这篇文章给大家介绍基于水槽+卡夫卡+ Spark-Streaming的实时流式处理过程是怎样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

基于水槽+卡夫卡+ Spark-Streaming的实时流式处理完整流程

1,环境准备,四台测试服务器

火花集群三台,spark1, spark2, spark3

卡夫卡集群三台,spark1, spark2, spark3

管理员集群三台,spark1, spark2, spark3

日志接收服务器,spark1

日志收集服务器,复述(这台机器用来做复述,开发的,现在用来做日志收集的测试,主机名就不改了)

日志收集流程:

日志收集服务器→日志接收服务器→卡夫卡集群→火花集群处理

说明:,日志收集服务器,在实际生产中很有可能是应用系统服务器,日志接收服务器为大数据服务器中一台,日志通过网络传输到日志接收服务器,再入集群处理。

因为,生产环境中,往往网络只是单向开放给某台服务器的某个端口访问的。

水槽版本:apache-flume-1.5.0-cdh6.4.9,该版本已经较好地集成了对卡夫卡的支持

2日志收集服务器(汇总端)

配置水槽动态收集特定的日志,收集。相依,配置如下:

 #, Name 从而components 提醒却;能够代理
  a1.sources =tailsource-1
  a1.sinks =remotesink
  a1.channels =memoryChnanel-1
  
  #,描述/configure 从而来源
  a1.sources.tailsource - 1. - type =执行
  时间=a1.sources.tailsource - 1. - command  tail  -F /opt/模块/tmpdata/logs/1.日志
  
  a1.sources.tailsource - 1. - channels =memoryChnanel-1
  
  #,Describe 从而,下沉
  a1.sinks.k1.type =记录器
  
  #,Use  a  channel  which  buffers  events 内存拷贝
  时间=a1.channels.memorychnanel - 1. - type 内存
  a1.channels.memorychnanel - 1. -让- alive =10
  a1.channels.memorychnanel - 1. - capacity =100000
  a1.channels.memorychnanel - 1. - transactioncapacity =100000
  
  #,Bind 从而,source 以及sink 用,通道
  a1.sinks.remotesink.type =avro
  a1.sinks.remotesink.hostname =spark1
  a1.sinks.remotesink.port =666
  a1.sinks.remotesink.channel =, memoryChnanel-1 

日志实时监控日志后,通过网络avro类型,传输到spark1服务器的666端口上

启动日志收集端脚本:

<>以前bin/flume-ng  agent ——conf  conf ——conf-file  conf/collect.conf ——name  a1  -Dflume.root.logger=INFO,控制台

3日志接收服务器

配置水槽实时接收日志,收集。相依,配置如下:

 # agent  section ,
  时间=producer.sources  s ,
  时间=producer.channels  c ,
  时间=producer.sinks  r ,
  ,,
  # source  section ,
  producer.sources.s.type =avro
  producer.sources.s.bind =spark1
  producer.sources.s.port =666
  
  时间=producer.sources.s.channels  c ,
  ,,
  #,Each 水槽# 39;s  type  must  be  defined ,
  producer.sinks.r.type =org.apache.flume.sink.kafka.KafkaSink
  producer.sinks.r.topic =mytopic
  时间=producer.sinks.r.brokerList  spark1:9092、spark2:9092 spark3:9092
  producer.sinks.r.requiredAcks =1
  producer.sinks.r.batchSize =20
  producer.sinks.r.channel =, c1
  ,
  # Specify 从而,channel 从而sink  should  use ,
  时间=producer.sinks.r.channel  c ,
  ,,
  #,Each 频道# 39;s  type  is 只定义,
  producer.channels.c.type ,,=, org.apache.flume.channel.kafka.KafkaChannel
  producer.channels.c.capacity =10000
  producer.channels.c.transactionCapacity =1000
  producer.channels.c.brokerList=spark1:9092 spark2:9092 spark3:9092
  producer.channels.c.topic=channel1
  producer.channels.c.zookeeperConnect=spark1:2181、spark2:2181 spark3:2181 

关键是指定如源为接收网络端口的666年来的数据,并输入卡夫卡的集群,需配置好主题及zk的地址

启动接收端脚本:

<>以前bin/flume-ng  agent ——conf  conf ——conf-file  conf/receive.conf ——name  producer  -Dflume.root.logger=INFO,控制台

4,火花集群处理接收数据

 import  org.apache.spark.SparkConf
  import  org.apache.spark.SparkContext
  import  org.apache.spark.streaming.kafka.KafkaUtils
  import  org.apache.spark.streaming.Seconds
  import  org.apache.spark.streaming.StreamingContext
  import  kafka.serializer.StringDecoder
  import  scala.collection.immutable.HashMap
  import  org.apache.log4j.Level
  import  org.apache.log4j.Logger/* *
  ,* @author 管理员
  ,*/object  KafkaDataTest  {
  def 才能;主要(args:数组[String]):, Unit =, {
  
  ,,,Logger.getLogger (“org.apache.spark") .setLevel (Level.WARN);
  ,,,Logger.getLogger (“org.eclipse.jetty.server") .setLevel (Level.ERROR);
  
  ,,,val  conf =, new  SparkConf () .setAppName (“stocker") .setMaster(“本地[2]“)
  ,,,val  sc =, new  SparkContext(配置)
  
  ,,,val  ssc =, new  StreamingContext (sc,秒(1))
  
  ,,,//,Kafka 配置
  
  ,,,val  topics =,设置(“mytopic")
  
  ,,,val  brokers =,“spark1:9092, spark2:9092, spark3:9092"
  
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null

基于水槽+卡夫卡+ Spark-Streaming的实时流式处理过程是怎样的