飞道的博客

大数据小白必知必会之Flume实现过滤器效果

547人阅读  评论(0)

写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/
尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己

        在差不多一年前,菌刚接触Flume那会,写了一篇关于Flume的博客。今天无意间翻到,才发现当时介绍的内容是多么的浅显,于是菌打算再为大家介绍如何在Flume中实现过滤器的操作。

        码字不易,先赞后看!



Flume过滤器

        

1、案例场景

        A、B两台日志服务机器实时生产日志主要类型为 access.lognginx.logweb.log

        现在要求:

        把A、B 机器中的access.lognginx.logweb.log 采集汇总到C机器上然后统一收集到hdfs中。

        但是在hdfs中要求的目录为:

/source/logs/access/20180101/**
/source/logs/nginx/20180101/**
/source/logs/web/20180101/**

2、场景分析

3、数据流程处理分析

4、实现

服务器A对应的IP为 192.168.100.100
服务器B对应的IP为 192.168.100.110
服务器C对应的IP为 192.168.100.120

采集端配置文件开发

        node01与node02服务器开发flume的配置文件

[root@node01 ~]# cd /export/servers/apache-flume-1.8.0-bin/conf
[root@node01 conf]# vim exec_source_avro_sink.conf
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/taillogs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
##  static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/taillogs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/taillogs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

注:

        定义拦截器的类型
        a1.sources.r1.interceptors = i1
        a1.sources.r1.interceptors.i1.type = static

        需要注意的是:static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value键值对

        例如:

        a1.sources.r1.interceptors.i1.key = type
        a1.sources.r1.interceptors.i1.value = access

服务端配置文件开发

        在node03上面开发flume配置文件

[root@node03 ~]# cd /export/servers/apache-flume-1.8.0-bin/conf
[root@node03 conf]# vim avro_source_hdfs_sink.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.100.120
a1.sources.r1.port =41414

#添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder


#定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://192.168.100.100:8020/source/logs/%{
   type}/%Y%m%d

#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

采集端文件生成脚本

        为了方便观察采集的结果,我们分别在node01和node02上开发shell脚本,模拟数据生成。


[root@node01 servers]# cd /export/shells/
[root@node01 shells]# vim server.sh
#!/bin/bash
while true
do
 date >> /export/taillogs/access.log;
 date >> /export/taillogs/web.log;
 date >> /export/taillogs/nginx.log;
  sleep 0.5;
done

顺序启动服务

        node03启动flume实现数据收集

[root@node03 conf]# cd /export/servers/apache-flume-1.8.0-bin
[root@node03 apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

        node01与node02启动flume实现数据监控

[root@node01 shells]# cd /export/servers/apache-flume-1.8.0-bin
[root@node01 apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

        node01 与 node02 启动生成文件脚本

cd /export/shells
sh server.sh

5、效果实现截图

        查看hdfs指定的输出路径下的目录构成

        查看某个目录下的文件构成

        可以发现随着我们shell脚本的启动,数据被不断的追加到指定的监控文件中,node01和node02在检测到变化之后,将变化的内容在node03进行汇总,然后node03根据定义的不同生产日志类型,对于进行“过滤”输出到HDFS的不同目录下。

小结

        本篇博客作者简单为大家介绍了一种Flume拦截器的使用示例,如果有看不太懂的小伙伴建议与这篇文章一起食用。当然,都看过的小伙伴也许也会感觉还是少了些内容,例如Flume的自定义监控选择器Sink组,还有Flume的web端监控——Ganglia等等…大家Duck不必担心,接下来的几篇博客,博主将详细介绍这些关于Flume的“干货”,敬请期待!!!

        受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波🙏

        希望我们都能在学习的道路上越走越远😉


转载:https://blog.csdn.net/weixin_44318830/article/details/108612688
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场