水槽实际生产场景分析

  

<强>需求:B两台日志服务器实时生产日志主要类型为access.log, nginx.log, web.log,现在要求:
把A、B机器中的access.log, nginx.log,网络。日志采集汇总到C机器上然后统一收集到hdfs中,但是在hdfs中要求的目录为:
   //logs/访问来源/日期/* *
   /源/logs/nginx/日期/* *
   /源/logs/web/日期/* *
<>强场景分析:
水槽实际生产场景分析

  

<>强规划:
hadoop01 (web01):
     来源:访问。日志、nginx.log web.log
     频道:内存
     水槽:avro
hadoop02 (web02):
     来源:访问。日志、nginx.log web.log
     频道:内存
     水槽:avro
hadoop03(数据收集):
     源,avro
     频道:内存
     水槽:hdfs
<强>配置文件:

  
 <代码> # exec_source_avro_sink.properties
  #指定各个核心组件
  a1。=r1, r2 r3来源
  a1。水槽=k1
  a1。渠道=c?
  # r1
  a1.sources.r1。类型=执行
  a1.sources.r1.command=tail - f/home/hadoop/flume_data/access.log
  a1.sources.r1.interceptors=i1
  a1.sources.r1.interceptors.i1.type=静态
  a1.sources.r1.interceptors.i1.key=类型
  a1.sources.r1.interceptors.i1.value=https://www.yisu.com/zixun/access
  
  # r2
  a1.sources.r2。类型=执行
  a1.sources.r2.command=tail - f/home/hadoop/flume_data/nginx.log
  a1.sources.r2.interceptors=i2
  a1.sources.r2.interceptors.i2.type=静态
  a1.sources.r2.interceptors.i2.key=类型
  a1.sources.r2.interceptors.i2.value=nginx
  
  # r3
  a1.sources.r3。类型=执行
  a1.sources.r3.command=tail - f/home/hadoop/flume_data/web.log
  a1.sources.r3.interceptors=i3
  a1.sources.r3.interceptors.i3.type=静态
  a1.sources.r3.interceptors.i3.key=类型
  a1.sources.r3.interceptors.i3.value=网络
  
  #描述沉
  a1.sinks.k1。类型=avro
  a1.sinks.k1。主机名=hadoop03
  a1.sinks.k1。端口=41414
  
  #使用一个通道在内存中缓冲事件
  a1.channels.c1。类型=内存
  a1.channels.c1。能力=20000
  a1.channels.c1。transactionCapacity=10000
  
  # channela1.sources.r1绑定源和水槽。渠道=c?
  a1.sources.r2。渠道=c?
  a1.sources.r3。渠道=c?
  a1.sinks.k1。通道c1= 
  
 <代码> # avro_source_hdfs_sink.properties
  #定义代理名、来源、渠道,水槽的名称
  a1。=r1来源
  a1。水槽=k1
  a1。渠道=c?
  
  #定义来源
  a1.sources.r1。类型=avro
  a1.sources.r1。绑定=0.0.0.0
  a1.sources.r1。端口=41414
  
  #添加时间拦截器
  a1.sources.r1.interceptors=i1
  a1.sources.r1.interceptors.i1.type=org.apache.flume.interceptor.TimestampInterceptor $建设者
  
  #定义渠道
  a1.channels.c1。类型=内存
  a1.channels.c1。能力=20000
  a1.channels.c1。transactionCapacity=10000
  
  #定义水槽
  a1.sinks.k1。类型=hdfs
  a1.sinks.k1.hdfs.path=hdfs://myha01/源/日志/Y %{类型}/% % m % d
  a1.sinks.k1.hdfs。filePrefix=事件
  a1.sinks.k1.hdfs。文件类型=DataStream数据
  a1.sinks.k1.hdfs。writeFormat=文本
  #时间类型
  a1.sinks.k1.hdfs。useLocalTimeStamp=true
  #生成的文件不按条数生成
  a1.sinks.k1.hdfs。rollCount=0
  #生成的文件按时间生成
  a1.sinks.k1.hdfs。rollInterval=30
  #生成的文件按大小生成
  a1.sinks.k1.hdfs。rollSize=10485760
  #批量写入hdfs的个数
  a1.sinks.k1.hdfs。batchSize=20
  #水槽操作hdfs的线程数(包括新建,写入等)
  a1.sinks.k1.hdfs.threadsPoolSize=10
  #操作hdfs超时时间
  a1.sinks.k1.hdfs.callTimeout=30000
  
  #组装源、渠道下沉
  a1.sources.r1。渠道=c?
  a1.sinks.k1。通道c1= 
  

<强>测试:

  
 <代码> #在hadoop01和hadoop02上的/home/hadoop/数据有数据文件access.log, nginx。日志,web.log
  #先启动hadoop03上的水槽:(存储)
  flume-ng代理- c conf - f avro_source_hdfs_sink。- name属性a1 -Dflume.root.logger=调试控制台
  #然后在启动hadoop01和hadoop02上的命令水槽(收集)
  flume-ng代理- c conf - f exec_source_avro_sink。- name属性a1 -Dflume.root.logger=调试控制台 

水槽实际生产场景分析