飞道的博客

Flume之配置文件编写步骤[鹏哥出品,必属精品]

260人阅读  评论(0)

Flume


一、写flume的步骤

1.1 画拓扑图

总结:一个channel只能输出一个结果文件。

一个flume agent 由 source + channel + sink 构成,类比于mapper + shuffer + reducer。

1.1.1 确定source类型

	常用类型:
    1) arvo:  用于Flume agent 之间的数据源传递
    2) netcat: 用于监听端口
    3exec: 用于执行linux中的操作指令
    4) spooldir: 用于监视文件或目录
    5) taildir: 用于监视文件或目录,同时支持追加的监听
	总结 ,3/4/5三种方式,最常用的是5,适合用于监听多个实时追加的文件,并且能够实现断点续传。

1.1.2 确定channel selector 的选择器

    1)replicating channel selector:复制,每个channel发一份数据 -- 默认的选择器
    2) multiplexing channel selector : 根据配置配件,指定source源获取的数据发往一个或多个channel

1.1.3 确认channel类型参数

    1) Memory Channel : 加载在内存中,存在数据丢失的风险 -- 学习阶段使用此参数
    2File Channel :落入磁盘

1.1.4 确定sinkprocessor参数

    1) DefaultSinkProcessor:对应的是单个的Sink
    2) LoadBalancingSinkProcessor :对应的是多个的Sink,可以实现负载均衡的功能
    3) FailoverSinkProcessor :对应的是多个的Sink,容错功能,先指定一个sink,所有的数据都走指定的sink,当sink故障以后,其他的sink顶上,如果开始sink恢复了,那么数据继续走原有指定的sink。

1.1.5 确定sink的类型

    常使用的类型有:
    1) avro: 用于输出到下一个Flume Agent ,一个开源的序列化框架
    2) hdfs: 输出到hdfs
    3) fill_roll: 输出到本地
    4) logger: 输出到控制台
    5) hbase: 输出到hbase

1.1.6 拓扑例图

1.2 写配置文件

1.2.1 配置文件的构成

  1. Name the components on this agent
  2. channel selector
  3. Describe/configure the source
  4. Describe the sink
  5. Use a channel which buffers events in memory
  6. Bind the source and sink to the channel

1.2.2 agent Name

情况1:source、channel、sink各一个

a1.sources = r1
a1.sinks = k1
a1.channels = c1

情况2:source一个、channel一个、sink多个

a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

情况3:source一个、channel多个、sink多个

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

1.2.3 source

情况1:avro

a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102 -- hosename
a2.sources.r1.port = 4141 -- 端口号

情况2:netcat

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost -- 连接的节点 如果是0.0.0.0,则表示任意机器都可以
a1.sources.r1.port = 44444  -- 端口号

情况3:exec

a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log -->linux执行的命令
a2.sources.r2.shell = /bin/bash -c -- linux的解析器

情况4: sqooldir

# Describe/configure the source
a3.sources.r3.type = spooldir -- 定义source类型
a3.sources.r3.spoolDir = /opt/module/flume/upload -- 定义监控的文件或目录
a3.sources.r3.fileSuffix = .COMPLETED -- 定义文件上传后的后缀
a3.sources.r3.fileHeader = true -- 是否有文件头
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

情况5:talidir

# Describe/configure the source
a4.sources.r4.type = TAILDIR 
a4.sources.r4.positionFile = /opt/module/flume/tail_dir.json -- 指定position_file 的位置
a4.sources.r4.filegroups = f1 f2 -- 监控的文件目录集合
a4.sources.r4.filegroups.f1 = /opt/module/flume/files/.*file.* -- 定义监控的文件目录1
a4.sources.r4.filegroups.f2 = /opt/module/flume/files/.*log.* -- 定义监控的文件目录2

1.2.4 channel selector

情况1: replicating channel selector

# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating

情况2:multiplexing channel selector 需配合指定的拦截器使用(interceptor)

-- 指定拦截器
a1.sources.r1.interceptors = i1 -- 指定拦截器的名称
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
-- 指定拦截器的类型 = 自定义拦截器中builder的实现类的全类名

-- 指定channel的选择器
a1.sources.r1.selector.type = multiplexing  -- 定义channel的选择器类型
a1.sources.r1.selector.header = type  -- 自定义拦截器的header的k
a1.sources.r1.selector.mapping.letter = c1 -- letter是k一个值,相同的letter进入一个channel中
a1.sources.r1.selector.mapping.number = c2 -- number是k一个值,相同的number进入一个channel中

1.2.5 channel

情况1: memory

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000 --表示channel总容量为1000个event
a2.channels.c1.transactionCapacity = 100 -- 表示channel传输时收集到的100条event

情况2 : flie -----暂时不讨论

1.2.6 sinkprocessor

情况1:DefaultSinkProcessor

不用写任何配置信息,默认值。

情况2:FailoverSinkProcessor

a1.sinkgroups.g1.processor.type = failover -- 指定类型
a1.sinkgroups.g1.processor.priority.k1 = 5 --设置K1的sink的优先级
a1.sinkgroups.g1.processor.priority.k2 = 10 --设置K2的sink的优先级
a1.sinkgroups.g1.processor.maxpenalty = 10000 -- 设置故障的转换时间10s。默认值为30s

情况3:LoadBalancingSinkProcessor

a1.sinkgroups.g1.processor.type =load_balance -- 指定类型
a1.sinkgroups.g1.processor.backoff = true -- 暂不讨论
a1.sinkgroups.g1.processor.selector =round_robin -- 暂不讨论

1.2.6 sink

情况1:avro

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104 -- hosaname
a2.sinks.k1.port = 4141  -- 端口

情况2:hdfs

a4.sinks.k4.type = hdfs
a4.sinks.k4.hdfs.path = hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H   -- 上传到HDFS的路径
#上传文件的前缀
a4.sinks.k4.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a4.sinks.k4.hdfs.round = true
#多少时间单位创建一个新的文件夹
a4.sinks.k4.hdfs.roundValue = 1
#重新定义时间单位
a4.sinks.k4.hdfs.roundUnit = hour
#是否使用本地时间戳
a4.sinks.k4.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a4.sinks.k4.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a4.sinks.k4.hdfs.fileType = DataStream
#多久生成一个新的文件
a4.sinks.k4.hdfs.rollInterval = 60  -- 单位是秒
#设置每个文件的滚动大小大概是128M
a4.sinks.k4.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a4.sinks.k4.hdfs.rollCount = 0

情况3:fill_roll

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume/datas/flume3 -- 指定上传到本地的路径

情况4:logger

# Describe the sink
a1.sinks.k1.type = logger

情况5:hbase —暂时不讨论

1.2.7 连接source、channel、sink

情况1:source、channel、sink各一个、

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

情况2:source一个、channel一个、sink多个

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

情况3:source一个、channel多个、sink多个

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

1.2.8 端口和ip的区别

  • sink端:向指定ip地址的端口发送数据
端口:
ip(hostname):
  • source端:监视指定端口并接收指定ip发送来的数据
端口:该端口只能是自己机器的端口
ip(hostname):指能够接受来自此ip的数据

1.3 连接flume

1.3.1 查看指定ip的通信端口

netstat -ntlp | grep 端口号

1.3.2 关闭端口

sudo kill 端口的进程号

1.3.3 连接指定ip地址的指定端口

nc ip 端口号

1.3.4 启动flume

bin/flume-ng agent -n [agent name] -c conf -f [自定义flume配置文件] -Dflume.root.logger=INFO,console

二、自定义interceptor,source、 sink

2.1 自定义intercepor

package flume_interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

/**
 * @author lianzhipeng
 * @Description
 * @create 2020-05-05 10:45
 */

public class MyInterceptor implements Interceptor {

    /**
     * Description: 初始化方法,新建Interceptor时使用
     *
     * @Author: lianzhipeng
     * @Date: 2020/5/5 10:45
     * @return: void
     */
    public void initialize() {

    }

    /**
     * Description: 更改方法,对event进行处理
     *
     * @param event 传入的数据
     * @Author: lianzhipeng
     * @Date: 2020/5/5 10:47
     * @return: org.apache.flume.Event 返回处理好的数据
     */
    public Event intercept(Event event) {

        //获取event的header
        Map<String, String> headers = event.getHeaders();

        //获取event的body

        byte[] body = event.getBody();

        //处理数据

        String string = new String(body);
        char c = string.charAt(0);

        if (c >= 'a' && c <= 'z' || c >= 'A' && c <= 'Z') {
            headers.put("type", "char");
        } else {
            headers.put("type", "not-char");
        }

        //返回数据

        return event;


    }

    public List<Event> intercept(List<Event> list) {

        for (Event event : list) {
            intercept(event);
        }
        return list;


    }

    public void close() {

    }

    /**
     * 框架会调用MyBulider来创建自定义拦截器实例
     */
    public static class MyBulider implements Builder {

        /**
         * Description: 创建自定义拦截器实例的方法
         *
         * @Author: lianzhipeng
         * @Date: 2020/5/5 10:54
         * @return: org.apache.flume.interceptor.Interceptor
         */
        public Interceptor build() {
            return new MyInterceptor();
        }

        /**
         * Description: 读取配置信息
         *
         * @param context
         * @Author: lianzhipeng
         * @Date: 2020/5/5 10:54
         * @return: void
         */
        public void configure(Context context) {

        }
    }
}

2.2 自定义source

package flume_interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

/**
 * @author lianzhipeng
 * @Description
 * @create 2020-05-05 14:31
 */

public class MySource extends AbstractSource implements Configurable, PollableSource {
    private String prefix;
    private Long interval;


    /**
     * Description:拉取事件并交给ChannelProcessor处理的方法
     *
     * @Author: lianzhipeng
     * @Date: 2020/5/5 14:33
     * @return: org.apache.flume.PollableSource.Status
     */
    public Status process() throws EventDeliveryException {
        Status status = null;

        try {
            // 通过外部方法拉取数据
            Event e = getSomeData();

            // Store the Event into this Source's associated Channel(s)
            ChannelProcessor channelProcessor = getChannelProcessor();

            channelProcessor.processEvent(e);

            status = Status.READY;
        } catch (Throwable t) {
            // Log exception, handle individual exceptions as needed

            status = Status.BACKOFF;

            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error)t;
            }
        }

        return status;
    }


    /**
     * Description:拉取数据并包装成event的过程
     * @Author: lianzhipeng
     * @Date: 2020/5/5 14:55
     * @return: org.apache.flume.Event 拉取到的数据
    */
    private Event getSomeData() throws InterruptedException {

        int i = (int) (Math.random() * 1000);

        //添加前缀
        String message = prefix + i ;

        Thread.sleep(interval);
        //
        SimpleEvent event = new SimpleEvent();

        event.setBody(message.getBytes());
        return  event;

    }
    /**
     * Description: 如果拉取不到数据,backoff时间的增长速度
     *
     * @Author: lianzhipeng
     * @Date: 2020/5/5 14:34
     * @return: long 增长量
     */
    public long getBackOffSleepIncrement() {
        return 1000;
    }

    /**
     * Description: 最大的等待时间
     *
     * @Author: lianzhipeng
     * @Date: 2020/5/5 14:38
     * @return: long
     */
    public long getMaxBackOffSleepInterval() {
        return 10000;
    }

    /**
     * Description:配置参数,来自于configurable,可以定义我们自己定义的source
     *
     * @param context 配置文件
     * @Author: lianzhipeng
     * @Date: 2020/5/5 14:39
     * @return: void
     */
    public void configure(Context context) {

        prefix = context.getString("prefff","xxxx" );
        interval = context.getLong("interval",500L);

    }
}

2.3 自定义sink

package flume_interceptor;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import java.io.IOException;

/**
 * @author lianzhipeng
 * @Description
 * @create 2020-05-05 14:31
 */

public class MySiink extends AbstractSink implements Configurable {
    /**
     * Description: 改方法调用时,会从Channel中拉取数据并处理
     *
     * @Author: lianzhipeng
     * @Date: 2020/5/5 15:09
     * @return: org.apache.flume.Sink.Status 处理的状态
     */
    public Status process() throws EventDeliveryException {
        Status status = null;

        // Start transaction
        //获取channel
        Channel ch = getChannel();
        //拉取数据的事务
        Transaction txn = ch.getTransaction();
        //开始拉取
        txn.begin();
        try {
            // This try clause includes whatever Channel operations you want to do

            //拉取的数据,如果拉取不到,则返回null
            Event event;
            //如果拉取的数据为null,则等0.1秒后继续拉取数据,知道拉取数据
            while ((event = ch.take()) == null) {
                Thread.sleep(100);
            }

            // Send the Event to the external repository.
            //如果拉取到了数据,将数据进行处理
            storeSomeData(event);

            txn.commit();
            status = Status.READY;
        } catch (Throwable t) {
            txn.rollback();

            // Log exception, handle individual exceptions as needed

            status = Status.BACKOFF;

            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error) t;
            }
        } finally {
            //拉取事务的关闭
            txn.close();
        }
        return status;
    }

    private void storeSomeData(Event event) throws IOException {

        //获取event的body数据
        byte[] body = event.getBody();
        //将数据写出到控制台
        System.out.write(body);
        System.out.println();
    }


    public void configure(Context context) {

    }
}




转载:https://blog.csdn.net/Mamba_victor/article/details/105939189
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场