介绍
这篇文章给大家介绍基于水槽+卡夫卡+ Spark-Streaming的实时流式处理过程是怎样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
基于水槽+卡夫卡+ Spark-Streaming的实时流式处理完整流程
1,环境准备,四台测试服务器
火花集群三台,spark1, spark2, spark3
卡夫卡集群三台,spark1, spark2, spark3
管理员集群三台,spark1, spark2, spark3
日志接收服务器,spark1
日志收集服务器,复述(这台机器用来做复述,开发的,现在用来做日志收集的测试,主机名就不改了)
日志收集流程:
日志收集服务器→日志接收服务器→卡夫卡集群→火花集群处理
说明:,日志收集服务器,在实际生产中很有可能是应用系统服务器,日志接收服务器为大数据服务器中一台,日志通过网络传输到日志接收服务器,再入集群处理。
因为,生产环境中,往往网络只是单向开放给某台服务器的某个端口访问的。
水槽版本:apache-flume-1.5.0-cdh6.4.9,该版本已经较好地集成了对卡夫卡的支持
2日志收集服务器(汇总端)
配置水槽动态收集特定的日志,收集。相依,配置如下:
#, Name 从而components 提醒却;能够代理 a1.sources =tailsource-1 a1.sinks =remotesink a1.channels =memoryChnanel-1 #,描述/configure 从而来源 a1.sources.tailsource - 1. - type =执行 时间=a1.sources.tailsource - 1. - command tail -F /opt/模块/tmpdata/logs/1.日志 a1.sources.tailsource - 1. - channels =memoryChnanel-1 #,Describe 从而,下沉 a1.sinks.k1.type =记录器 #,Use a  channel which buffers events 内存拷贝 时间=a1.channels.memorychnanel - 1. - type 内存 a1.channels.memorychnanel - 1. -让- alive =10 a1.channels.memorychnanel - 1. - capacity =100000 a1.channels.memorychnanel - 1. - transactioncapacity =100000 #,Bind 从而,source 以及sink 用,通道 a1.sinks.remotesink.type =avro a1.sinks.remotesink.hostname =spark1 a1.sinks.remotesink.port =666 a1.sinks.remotesink.channel =, memoryChnanel-1
日志实时监控日志后,通过网络avro类型,传输到spark1服务器的666端口上
启动日志收集端脚本:
<>以前bin/flume-ng agent ——conf conf ——conf-file conf/collect.conf ——name a1 -Dflume.root.logger=INFO,控制台3日志接收服务器
配置水槽实时接收日志,收集。相依,配置如下:
# agent section , 时间=producer.sources s , 时间=producer.channels c , 时间=producer.sinks r , ,, # source section , producer.sources.s.type =avro producer.sources.s.bind =spark1 producer.sources.s.port =666 时间=producer.sources.s.channels c , ,, #,Each 水槽# 39;s type must be defined , producer.sinks.r.type =org.apache.flume.sink.kafka.KafkaSink producer.sinks.r.topic =mytopic 时间=producer.sinks.r.brokerList spark1:9092、spark2:9092 spark3:9092 producer.sinks.r.requiredAcks =1 producer.sinks.r.batchSize =20 producer.sinks.r.channel =, c1 , # Specify 从而,channel 从而sink should use , 时间=producer.sinks.r.channel c , ,, #,Each 频道# 39;s type is 只定义, producer.channels.c.type ,,=, org.apache.flume.channel.kafka.KafkaChannel producer.channels.c.capacity =10000 producer.channels.c.transactionCapacity =1000 producer.channels.c.brokerList=spark1:9092 spark2:9092 spark3:9092 producer.channels.c.topic=channel1 producer.channels.c.zookeeperConnect=spark1:2181、spark2:2181 spark3:2181
关键是指定如源为接收网络端口的666年来的数据,并输入卡夫卡的集群,需配置好主题及zk的地址
启动接收端脚本:
<>以前bin/flume-ng agent ——conf conf ——conf-file conf/receive.conf ——name producer -Dflume.root.logger=INFO,控制台4,火花集群处理接收数据
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import kafka.serializer.StringDecoder import scala.collection.immutable.HashMap import org.apache.log4j.Level import org.apache.log4j.Logger/* * ,* @author 管理员 ,*/object KafkaDataTest  { def 才能;主要(args:数组[String]):, Unit =, { ,,,Logger.getLogger (“org.apache.spark") .setLevel (Level.WARN); ,,,Logger.getLogger (“org.eclipse.jetty.server") .setLevel (Level.ERROR); ,,,val conf =, new SparkConf () .setAppName (“stocker") .setMaster(“本地[2]“) ,,,val sc =, new SparkContext(配置) ,,,val ssc =, new StreamingContext (sc,秒(1)) ,,,//,Kafka 配置 ,,,val topics =,设置(“mytopic") ,,,val brokers =,“spark1:9092, spark2:9092, spark3:9092" null null null null null null null null null null null null null null null null null null null null基于水槽+卡夫卡+ Spark-Streaming的实时流式处理过程是怎样的