<强>需求强>:B两台日志服务器实时生产日志主要类型为access.log, nginx.log, web.log,现在要求:
把A、B机器中的access.log, nginx.log,网络。日志采集汇总到C机器上然后统一收集到hdfs中,但是在hdfs中要求的目录为:
//logs/访问来源/日期/* *
/源/logs/nginx/日期/* *
/源/logs/web/日期/* *
<>强场景分析强>:
<>强规划强>:
hadoop01 (web01):
来源:访问。日志、nginx.log web.log
频道:内存
水槽:avro
hadoop02 (web02):
来源:访问。日志、nginx.log web.log
频道:内存
水槽:avro
hadoop03(数据收集):
源,avro
频道:内存
水槽:hdfs
<强>配置文件强>:
<代码> # exec_source_avro_sink.properties #指定各个核心组件 a1。=r1, r2 r3来源 a1。水槽=k1 a1。渠道=c? # r1 a1.sources.r1。类型=执行 a1.sources.r1.command=tail - f/home/hadoop/flume_data/access.log a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=静态 a1.sources.r1.interceptors.i1.key=类型 a1.sources.r1.interceptors.i1.value=https://www.yisu.com/zixun/access # r2 a1.sources.r2。类型=执行 a1.sources.r2.command=tail - f/home/hadoop/flume_data/nginx.log a1.sources.r2.interceptors=i2 a1.sources.r2.interceptors.i2.type=静态 a1.sources.r2.interceptors.i2.key=类型 a1.sources.r2.interceptors.i2.value=nginx # r3 a1.sources.r3。类型=执行 a1.sources.r3.command=tail - f/home/hadoop/flume_data/web.log a1.sources.r3.interceptors=i3 a1.sources.r3.interceptors.i3.type=静态 a1.sources.r3.interceptors.i3.key=类型 a1.sources.r3.interceptors.i3.value=网络 #描述沉 a1.sinks.k1。类型=avro a1.sinks.k1。主机名=hadoop03 a1.sinks.k1。端口=41414 #使用一个通道在内存中缓冲事件 a1.channels.c1。类型=内存 a1.channels.c1。能力=20000 a1.channels.c1。transactionCapacity=10000 # channela1.sources.r1绑定源和水槽。渠道=c? a1.sources.r2。渠道=c? a1.sources.r3。渠道=c? a1.sinks.k1。通道c1=代码>
<代码> # avro_source_hdfs_sink.properties #定义代理名、来源、渠道,水槽的名称 a1。=r1来源 a1。水槽=k1 a1。渠道=c? #定义来源 a1.sources.r1。类型=avro a1.sources.r1。绑定=0.0.0.0 a1.sources.r1。端口=41414 #添加时间拦截器 a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=org.apache.flume.interceptor.TimestampInterceptor $建设者 #定义渠道 a1.channels.c1。类型=内存 a1.channels.c1。能力=20000 a1.channels.c1。transactionCapacity=10000 #定义水槽 a1.sinks.k1。类型=hdfs a1.sinks.k1.hdfs.path=hdfs://myha01/源/日志/Y %{类型}/% % m % d a1.sinks.k1.hdfs。filePrefix=事件 a1.sinks.k1.hdfs。文件类型=DataStream数据 a1.sinks.k1.hdfs。writeFormat=文本 #时间类型 a1.sinks.k1.hdfs。useLocalTimeStamp=true #生成的文件不按条数生成 a1.sinks.k1.hdfs。rollCount=0 #生成的文件按时间生成 a1.sinks.k1.hdfs。rollInterval=30 #生成的文件按大小生成 a1.sinks.k1.hdfs。rollSize=10485760 #批量写入hdfs的个数 a1.sinks.k1.hdfs。batchSize=20 #水槽操作hdfs的线程数(包括新建,写入等) a1.sinks.k1.hdfs.threadsPoolSize=10 #操作hdfs超时时间 a1.sinks.k1.hdfs.callTimeout=30000 #组装源、渠道下沉 a1.sources.r1。渠道=c? a1.sinks.k1。通道c1=代码>
<强>测试强>:
<代码> #在hadoop01和hadoop02上的/home/hadoop/数据有数据文件access.log, nginx。日志,web.log #先启动hadoop03上的水槽:(存储) flume-ng代理- c conf - f avro_source_hdfs_sink。- name属性a1 -Dflume.root.logger=调试控制台 #然后在启动hadoop01和hadoop02上的命令水槽(收集) flume-ng代理- c conf - f exec_source_avro_sink。- name属性a1 -Dflume.root.logger=调试控制台代码>