9.8 ETL
9.8.1概念描述
-  ETL理解 - ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,
- 目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据
 
-  设计分为3部分 - 数据抽取
- 数据的清洗转换
- 数据的加载
 
-  对3部分的理解 - 在设计ETL的时候我们也是从这三部分出发。数据的抽取是从各个不同的数据源抽取到ODS(OperationalData Store,操作型数据存储)中——这个过程也可以做一些数据的清洗和转换),在抽取的过程中需要挑选不同的抽取方法,尽可能的提高ETL的运行效率。
- ETL三个部分中,花费时间最长的是“T”(Transform,清洗、转换)的部分,一般情况下这部分工作量是整个ETL的2/3。数据的加载一般在数据清洗完了之后直接写入DW(DataWarehousing,数据仓库)中去。
- 各模块可灵活进行组合,形成ETL处理流程。
 
9.8.2 模块介绍
- 数据抽取【Extract】 
  - 确定数据源,需要确定从哪些源系统进行数据抽取
- 定义数据接口,对每个源文件及系统的每个字段进行详细说明
- 确定数据抽取的方法:是主动抽取还是由源系统推送?是增量抽取还是全量抽取?是按照每日抽取还是按照每月抽取?
 
- 数据清洗转换【Transform】 
  - 通常的做法是从业务系统到ODS做清洗,将脏数据和不完整数据过滤掉,在从ODS到DW的过程中转换,进行一些业务规则的计算和聚合
- 数据清洗 
    - 数据清洗的任务是过滤那些不符合要求的数据,将过滤的结果交给业务主管部门,确认是否过滤掉还是由业务单位修正之后再进行抽取。
- 不符合要求的数据主要是有不完整的数据、错误的数据、重复的数据三大类。
 
- 数据转换 
    - 数据转换的任务主要进行不一致的数据转换、数据粒度的转换,以及一些商务规则的计算。
- 空值处理:可捕获字段空值,进行加载或替换为其他含义数据,或数据分流问题库
- 数据标准:统一元数据、统一标准字段、统一字段类型定义
- 数据拆分:依据业务需求做数据拆分,如身份证号,拆分区划、出生日期、性别等
- 数据验证:时间规则、业务规则、自定义规则
- 数据替换:对于因业务因素,可实现无效数据、缺失数据的替换
- 数据关联:关联其他数据或数学,保障数据完整性
 
 
- 数据装载【Load】 
  - 装载主要是将经过转换的数据装载到数据仓库里面,可以通过直连数据库的方式来进行数据装载,可以充分体现高效性。
- 在应用的时候可以随时调整数据抽取工作的运行方式,可以灵活的集成到其他管理系统中。
 
9.8.3 ETL工具

-  对于ETL工具的理解 - ETL是数据整合解决方案,说小了,就是倒数据的工具
- 常见的ETL工具 
    - sqoop
- DataX
- Kettle
- Canal
- StreamSets
 
 
-  sqoop - 理解 
    - 是Apache开源的一款在Hadoop和关系数据库服务器之间传输数据的工具
- 将一个关系型数据库(MySQL ,Oracle等)的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导出到关系型数据库中
 
- 本质 
    - sqoop命令的本质是转化为MapReduce程序。
 
- 步骤 
    - sqoop分为导入(import)和导出(export)
 
- 策略分为table和query
- 模式分为增量和全量
  
- 理解 
    
-  DataX - 理解 
    - DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台
- 实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、、DRDS 等各种异构数据源之间高效的数据同步功能
 
  
- 理解 
    
-  Kettle - 一款国外免费开源的、可视化的、功能强大的ETL工具,纯java编写
- 可以在Windows、Linux、Unix上运行,数据抽取高效稳定。
 
-  canal - canal是阿里巴巴旗下的一款开源项目,纯Java开发。
- 基于数据库增量日志解析,提供增量数据实时订阅和消费,目前主要支持了MySQL,也支持mariaDB
  
-  StreamSets - 是大数据实时采集ETL工具,可以实现不写一行代码完成数据的采集和流转。
- 通过拖拽式的可视化界面,实现数据管道(Pipelines)的设计和定时任务调度。
- 创建一个Pipelines管道需要配置数据源(Origins)、操作(Processors)、目的地(Destinations)三部分
 
9.8.4 加载策略
- 系统日志分析方式 
  - 通过分析数据库自身的日志来判断变化的数据
 
- 触发器方式 
  - 直接进行数据加载
- 利用增量日志表进行增量加载
 
- 时间戳方式 
  - 在源表上增加一个时间戳字段,系统中更新修改表数据的时候,同时修改时间戳字段的值。
 
- 全表比对方式 
  - 全表比对即在增量抽取时,ETL 进程逐条比较源表和目标表的记录,将新增和修改的记录读取出来。
 
- 源系统增量(delta)数据直接或者转换后加载 
  - 日常的 ETL 更新中,还会遇到目标表的数据来源来自于多张源表,通过关键字段的拼接进行更新操作。
- 如果多张源表都有时间戳字段,可以利用时间戳进行增量更新,另外还可以采用全表比对的方式进行增量更新
 
9.9 常见概念描述
9.9.1 数据仓库
- 概念:
 数据仓库是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合,用于支持管理中的决策制定。
- 特点:
 首先,数据仓库用于支持决策,面向分析型数据处理,它不同于企业现有的操作型数据库;
 其次,数据仓库是对多个异构的数据源有效集成,集成后按照主题进行了重组,并包含历史数据,而且存放在数据仓库中的数据一般不再修改。
- 应用场景:
 一般都是作为商业智能系统、数据仪表盘等可视化报表服务的数据源。
- 数据仓库是一个功能概念,是将企业的各业务系统产生的基础数据,通过维度建模的方式,将业务数据划分为多个主题(集市)统一存储,统一管理。
9.9.2 数据集市

- 概念:
 数据集市可以理解为是一种"小型数据仓库",它只包含单个主题,且关注范围也非全局。数据集市可以分为两种:
- 分类:
 独立数据集市,这类数据集市有自己的源数据库和ETL架构;
 非独立数据集市,这种数据集市没有自己的源系统,它的数据来自数据仓库。
- 应用场景:
 数据集市是数仓之上更聚焦的业务主题合集,更偏向于应对业务数据快速高效应
 用的需求,一般用于商业智能系统中探索式和交互式数据分析应用。
- 数据集市是一个结构概念,它是企业级数据仓库的一个子集,主要面向部门级业务,
 并且只面向某个特定的主题。
9.9.3 数据孤岛

数据孤岛理解
- 业务系统之间各自为政、相互独立造成的数据孤岛,体现在业务不集成、流程不互通、数据不共享
9.9.4 数据湖

- 概念:
 2010年,Pentaho首席技术官James Dixon创造了“数据湖”一词。
 他把数据集市描述成一瓶清洗过的、包装过的和结构化易于使用的水。
 数据湖更像是在自然状态下的水,数据流从源系统流向这个湖。用户可以在数据湖里校验,取样或完全的使用数据。
- 特点:
 从源系统导入所有的数据,没有数据流失。数据存储时没有经过转换或只是简单的处理。数据转换和定义schema 用于满足分析需求。
- 应用场景:
 以大数据技术为基础有多样化数据结构海量大数据存储需求,也可作为数据仓库或者数据集市的数据源。
- 数据湖是一种数据存储理念,存储企业各种各样的原始数据的大型仓库,包括结构
 化、非结构、二进制图像、音频、视频等等
9.9.5 数据中台

- 概念: 
  - 数据中台是指通过企业内外部多源异构的数据采集、治理、建模、分析,应用,使数据对内优化管理提高业务,对外可以数据合作价值释放,成为企业数据资产管理中枢。数据中台建立后,会形成数据API,为企业和客户提供高效各种数据服务。
 
- 特点: 
  - 利用大数据技术,对海量数据进行统一采集、计算、存储,并使用统一的数据规范进行管理,将企业内部所有数据统一处理形成标准化数据,挖掘出对企业最有价值的数据,构建企业数据资产库,提供一致的、高可用大数据服务。
- 数据中台不是一套软件,也不是一个信息系统,而是一系列数据组件的集合,企业基于自身的信息化建设基础、数据基础以及业务特点对数据中台的能力进行定义,基于能力定义利用数据组件搭建自己的数据中台。
 
- 应用场景:
 是将数据服务化提供给业务系统,目的是将数据能力渗透到业务各个环节,不限于决策分析。
- 数据中台是一个逻辑概念,为业务提供服务的主要方式是数据API,它包括了数据仓库,大数据、数据治理领域的内容。
9.9.6 宽表窄表
- 宽表
- 窄表
9.10 大数据架构
9.10.1互联网大数据平台

9.10.2 lambda架构
-  架构的提出 - Lambda 架构(Lambda Architecture)是由 Twitter 工程师南森·马茨(NathanMarz)提出的大数据处理架构。这一架构的提出基于马茨在 BackType 和 Twitter 上的分布式数据处理系统的经验
 
-  Lambda架构的优势 - Lambda 架构使开发人员能够构建大规模分布式数据处理系统。它具有很好的灵活性和可扩展性,也对硬件故障和人为失误有很好的容错性
 
-  Lambda架构组成 - 批处理层(Batch Layer) 
    - 批处理层存储管理主数据集(不可变的数据集)和预先批处理计算好的视图。
- 批处理层使用可处理大量数据的分布式处理系统预先计算结果。它通过处理所有的已有历史数据来实现数据的准确性。这意味着它是基于完整的数据集来重新计算的,能够修复任何错误,然后更新现有的数据视图。输出通常存储在只读数据库中,更新则完全取代现有的预先计算好的视图。
 
- 速度处理层(Speed Layer) 
    - 速度处理层会实时处理新来的大数据。
- 速度层通过提供最新数据的实时视图来最小化延迟。速度层所生成的数据视图可能不如批处理层最终生成的视图那样准确或完整,但它们几乎在收到数据后立即可用。而当同样的数据在批处理层处理完成后,在速度层的数据就可以被替代掉了
 
- 服务层(Serving Layer) 
    - 响应查询
- 批处理层和速度层处理完的结果都输出存储在服务层中,服务层通过返回预先计算的数据视图或从速度层处理构建好数据视图来响应查询
 
  
- 批处理层(Batch Layer) 
    
-  Lambda缺点 - Lambda 架构使用起来十分灵活,并且可以适用于多种应用场景,但在实际应用中,Lambda 架构也存在着一些不足,主要表现在它的维护很复杂。
- 使用 Lambda 架构时,架构师需要维护两个复杂的分布式系统,并且保证他们逻辑上产生相同的结果输出到服务层中。
- 维护 Lambda 架构的复杂性在于我们要同时维护两套系统架构:批处理层和速度层。我们已经说过了,在架构中加入批处理层是因为从批处理层得到的结果具有高准确性,而加入速度层是因为它在处理大规模数据时具有低延时性。
- 改进批处理层的系统让它具有更低的延时性,又或者是改进速度层的系统,让它产生的数据视图更具准确性和更加接近历史数据
 
9.10.3 Kappa架构
- 对lambda架构的改进提出 
  - Kappa 架构是由 LinkedIn 的前首席工程师杰伊·克雷普斯(Jay Kreps)提出的一种架构思想。克雷普斯是几个著名开源项目(包括 Apache Kafka 和 Apache Samza 这样的流处理系统)的作者之一,也是现在 Confluent 大数据公司的 CEO。
 
- Kappa架构的步骤 
  - 第一步,部署 Apache Kafka,并设置数据日志的保留期(Retention Period)。这里的保留期指的是你希望能够重新处理的历史数据的时间区间。 
    - 例如,如果你希望重新处理最多一年的历史数据,那就可以把 ApacheKafka 中的保留期设置为 365 天。如果你希望能够处理所有的历史数据,那就可以把 Apache Kafka 中的保留期设置为“永久(Forever)”。
 
- 第二步,如果我们需要改进现有的逻辑算法,那就表示我们需要对历史数据进行重新处理。 
    - 我们需要做的就是重新启动一个 Apache Kafka 作业实例(Instance)。这个作业实例将从头开始,重新计算保留好的历史数据,并将结果输出到一个新的数据视图中。我们知道 Apache Kafka 的底层是使用 Log Offset 来判断现在已经处理到哪个数据块了,所以只需要将 Log Offset 设置为 0,新的作业实例就会从头开始处理历史数据。
 
- 第三步,当这个新的数据视图处理过的数据进度赶上了旧的数据视图时,我们的应用便可以切换到从新的数据视图中读取。
- 第四步,停止旧版本的作业实例,并删除旧的数据视图。与 Lambda 架构不同的是,Kappa 架构去掉了批处理层这一体系结构,而只保留了速度层。你只需要在业务逻辑改变又或者是代码更改的时候进行数据的重新处理。
 
- 第一步,部署 Apache Kafka,并设置数据日志的保留期(Retention Period)。这里的保留期指的是你希望能够重新处理的历史数据的时间区间。 
    
- Kappa架构的缺点 
  - 因为 Kappa 架构只保留了速度层而缺少批处理层,在速度层上处理大规模数据可能会有数据更新出错的情况发生,这就需要我们花费更多的时间在处理这些错误异常上面。
- Kappa 架构的批处理和流处理都放在了速度层上,这导致了这种架构是使用同一套代码来处理算法逻辑的。所以 Kappa 架构并不适用于批处理和流处理代码逻辑不一致的场景。
 
9.11 数仓规范设计
目的在于约束N个人对齐认知,按照一个标准或流程进行开发,以保证数据一致性,流程清晰且稳定。
 提高开发效率,提升质量,降低沟通对齐成本,降低运维成本等
9.11.1 表命名规范
9.11.2 开发规范
9.11.3 流程规范
9.12 数仓元数据
9.12.1 业务元数据
- 理解
9.12.2 技术元数据
9.12.3 管理元数据
十、Flume
10.1 Flume的简介
Flume是一个分布式、可靠、和高可用的海量日志采集、汇聚和传输的系统。
Flume可以采集文件,socket数据包(网络端口)、文件夹、kafka、mysql数据库等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中
一般的采集、传输需求,通过对flume的简单配置即可实现;不用开发一行代码!
Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景
10.1.1 Flume概述
- Flume定义 
  - Flume是一个分布式、可靠、和高可用的海量日志采集、汇聚和传输的系统。
- 支持在系统中定制各类数据发送方,用于收集数据
- 同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力
 
- Flume的作用 
  - Flume可以采集文件,socket数据包(网络端口)、文件夹、kafka、mysql数据库等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中
 
10.1.2 Flume使用场景
- Flume解决的问题 
  - 线上数据一般主要是落地(存储到磁盘)或者通过socket传输给另外一个系统,这种情况下,你很难推动线上应用或服务去修改接口,实现直接向kafka里写数据,这时候你可能就需要flume这样的系统帮你去做数据传输。
 
10.1.3 Flume的体系架构

1. 核心的组件
-  Client - Client生产数据,运行在一个独立的线程
 
-  Event -  一个数据单元,由消息头和消息体组成 
-  Events可以是日志记录、avro对象等 
 
-  
-  Flow - Event从源点到达目的点的迁移的抽象
 
-  Agent -  一个独立的Flume进程,是Flume最小的运行单位,包含组件Source、Channel、Sink 
-  Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks 
 
-  
-  Source -  数据收集组件 
-  source从Client收集数据,然后将数据封装成Event传递给Channel 
-  支持多种Source格式 - AVRO:其他的Flume的Sink
- exec:Linux命令产生的数据
- Spooling Directory:文件目录
 
 
-  
-  Channel -  中转Event的一个临时存储,保存由Source组件传递来的Event(缓存,生产者消费者) 
-  Channel连接 sources 和 sinks ,这个有点像一个消息队列 
-  为了数据的安全和效率,可以考虑多种缓存的手段 - Memory:内存
- JDBC:数据库
- File:文件
 
 
-  
-  Sink -  数据的输出方,可以根据自己的需求将Event输出到任意的位置 - HDFS
- AVRO:输出到其他的Flume
- Kafka
 
-  从Channel中读取并移除Event,然后将Event传递到FlowPipeline中的下一个Agent(如果有的话) 
-  Sink从Channel收集数据,运行在一个独立线程。 
 
-  
-  Interceptor - 拦截器
- 可以根据业务的需求拦截指定的数据
 

将Agent1中的Event传递给下一个Agent到Agent2
 
2. Flume的采集的大致流程
10.1.4 Flume的组件详解
1. Agent结构
-  理解Agent - Flume运行的核心是Agent,flume采集系统就是由一个个的Agent连接起来所形成的额一个或简单或复杂的数据传输通道
 
-  Agent的作用 - 对于每一个Agent来说,他就是一个独立的守护进程(JVM),它负责从数据源接收数据,并发往下一个目的地
 
-  Agent的三个组件 - source:数据源
- channel:临时存储数据的管道
- sink:目的地
  
2. Source
-  理解Source - 数据源,通过Source组件可以指定让Flume读取哪里的数据,然后将数据传递给后面的channel
- 数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中
 
-  Flume内置的source格式 - Flume提供了很多内置的Source, 支持 读取多种数据源,包括Avro, log4j, syslog 和http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource
- 如果内置的Source无法满足需要, Flume还支持自定义Source。
  
-  几个常用的数据源的介绍 - NetCat Source 
    - 绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入
- 参数介绍 
      - type:source的类型,必须是netcat。
- bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。
- port:绑定的本地的端口。
 
 
- Avro Source 
    - 监听一个avro服务端口,采集Avro数据序列化后的数据
- 参数介绍 
      - type:avrosource的类型,必须是avro。
- bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。
- port:绑定的本地的端口。
 
 
- Exec Source 
    - 于Unix的command在标准输出上采集数据
- 参数介绍 
      - type:source的类型:必须是exec。
- command:要执行命令。
 
 
- Spooling Directory Source 
    - 监听一个文件里的文件新增,如果有则采集作为source
- 参数介绍 
      - type:source 的类型:必须是spooldir
- spoolDir:监听的文件夹 【提前创建目录】
- fileSuffix:上传完毕后文件的重命名后缀,默认为.COMPLETE
- deletePolicy:上传后的文件的删除策略never和immediate,默认为never。
- fileHeader:是否要加上该文件的绝对路径在header里,默认是false。
- basenameHeader:是否要加上该文件的名称在header里,默认是false。
 
 
 
- NetCat Source 
    
3. Channel
-  理解 - 用来缓存source递过来的数据,并形成一个个event事件,等待sink来拿
- 可以把channel理解为一个临时存储数据的管道
- 它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件
 
-  Channel的类型 - 内存 Memory Channel
- 内存+文件(磁盘)Spillable Memory Channel
- 文件 File Channel
- JDBC
  
-  几种常用的Channel - Memory Channel 使用内存作为数据的存储 
    - Type channel的类型:必须为memory
- capacity:channel中的最大event数目
- transactionCapacity:channel中允许事务的最大event数目
 
- File Channel 使用文件作为数据的存储 
    - Type channel的类型:必须为 file
- checkpointDir :检查点的数据存储目录【提前创建目录】
- dataDirs :数据的存储目录【提前创建目录】
- transactionCapacity:channel中允许事务的最大event数目
 
- Spillable Memory Channel 使用内存作为channel超过了阀值就存在文件中
 
- Memory Channel 使用内存作为数据的存储 
    
4. Sink
-  理解 - 数据输出,读取Channel整理好的一个个event并传送(存储)到指定地方
 
-  输出目的地 - 文件系统、数据库、Hadoop存数据,也可以是其他agent的source
- Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据,也可以是其他agent的Source。
- 在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。
- 注意:Channel中的数据直到进入目的地才会被删除,当Sink写入目的地失败后,可以自动重写, 不会造成数据丢失,这块是有一个事务保证的
 
-  Sink类型  
-  几个常用的Sink - HDFS Sink:将数据传输到hdfs集群中。比较常见,主要是针对实时计算场景 
    - type:sink的类型 必须是hdfs。
- hdfs.path:hdfs的上传路径。
- hdfs.filePrefix:hdfs文件的前缀。默认是:FlumeData
- hdfs.rollInterval:间隔多久产生新文件,默认是:30(秒) 0表示不以时间间隔为准。
- hdfs.rollSize:文件到达多大再产生一个新文件,默认是:1024(bytes)0表示不以文件大小为准。
- hdfs.rollCount:event达到多大再产生一个新文件,默认是:10(个)0表示不以event数目为准。*
- hdfs.batchSize:每次往hdfs里提交多少个event,默认为100
- hdfs.fileType:hdfs文件的格式主要包括:SequenceFile, DataStream,CompressedStream,如果使用了CompressedStream就要设置压缩方式。
- hdfs.codeC:压缩方式:gzip, bzip2, lzo, lzop, snappy
- 注:%{host}可以使用header的key。以及%Y%m%d来表示时间,但关于时间的表示需要在header里有timestamp这个key。
 
- logger Sink 将数据作为日志处理(根据flume中的设置的日志方式来显示) 
    - 将数据作为日志处理,可以选择打印到控制台或者写到文件中,这个主要在测试的时候使用
- 要在控制台显示在运行agent的时候加入:-Dflume.root.logger=INFO,console 。
- type:sink的类型:必须是 logger。
- maxBytesToLog:打印body的最长的字节数 默认为16
 
- Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。 
    - type:sink的类型:必须是 avro。
- hostname:指定发送数据的主机名或者ip
- port:指定发送数据的端口
 
- File Roll Sink:数据发送到本地文件。 
    - type:sink的类型:必须是 file_roll。
- sink.directory:存储文件的目录【提前创建目录】
- batchSize:一次发送多少个event。默认为100
- sink.rollInterval:多久产生一个新文件,默认为30s。单位是s。0为不产生新文件【即使没有数据也会产生文件】
 
 
- HDFS Sink:将数据传输到hdfs集群中。比较常见,主要是针对实时计算场景 
    
5. Interceptor
-  拦截器作用 - 当我们需要对数据进行过滤时,除了我们在Source、 Channel和Sink进行代码修改之外, Flume为我们提供了拦截器,拦截器也是chain形式的
 
-  拦截器的位置 - 拦截器的位置在Source和Channel之间
- 当我们为Source指定拦截器后,我们在拦截器中会得到event,根据需求我们可以对event进行保留还是抛弃,抛弃的数据不会进入Channel中。
  
-  几种常见的拦截器 - Timestamp Interceptor 时间戳拦截器 在header里加入key为timestamp,value为当前时间。 
    - type:拦截器的类型,必须为timestamp
- preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false
 
- Host Interceptor 主机名或者ip拦截器,在header里加入ip或者主机名 
    - type:拦截器的类型,必须为host
- preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false
- useIP:如果设置为true则使用ip地址,否则使用主机名,默认为true
- hostHeader:使用的header的key名字,默认为host
 
- Static Interceptor 静态拦截器,是在header里加入固定的key和value。 
    - type:avrosource的类型,必须是static。
- preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false
- key:静态拦截器添加的key的名字
- value:静态拦截器添加的key对应的value值
 
 
- Timestamp Interceptor 时间戳拦截器 在header里加入key为timestamp,value为当前时间。 
    
10.1.5 Flume特性和优点
1. Flume的复杂流动性
-  理解 - Flume允许用户构建多跳流程,其中事件在到达最终目的地之前会通过多个代理传播。
- 它还允许扇入和扇出流,上下文路由和备份路由(故障转移)
 
-  对于跨代理的处理 - 为了使数据跨多个代理或跃点流动,前一个代理的接收器和当前跃点的源必须为avro类
 型,接收器指向源的主机名(或IP地址)和端口
  - 这可以在Flume中实现,方法是为多个第一层代理配置一个avro接收器,它们均指向单个代理的avro源(同样,在这种情况下,您可以使用节俭的源/接收器/客户端)。第二层代理上的此源将接收到的事件合并到一个通道中,该通道由接收器消耗到其最终目的地。
  - Flume支持将事件流复用到一个或多个目的地。这是通过定义一种流多路复用器来实现的,该流多路复用器可以将事件复制或选择性地路由到一个或多个通道
  
- 为了使数据跨多个代理或跃点流动,前一个代理的接收器和当前跃点的源必须为avro类
2. 优点
- 数据产生者和数据收容器之间的缓存调节(消息中间件) 
  - 当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据
 
- 消息的可靠发送 
  - Flume的管道是基于事务,使用了两个事务模型(sender + receiver),保证了数据在传送和接收时的一致性.
- Flume使用两个独立的事务分别负责从soucrce到channel,以及从channel到sink的事件传递。一旦事务中所有的数据全部成功提交到channel,那么source才认为该数据读取完成。同理,只有成功被sink写出去的数据,才会从channel中移除。
- Sender保证只要能成功提交进来的数据就绝对不会少,百分百进入管道。Receiver向输出提交,只要返回成功,就可以把数据移除管道。
- Flume可以保证不丢数据,如此严格的事务管理,可能导致数据重复,而且慢。(事务的两个极端:要么是保证不会重复,但是可能丢数据。要么是保证不会丢数据,但是可能重复。 事务的两个管理的方法,一个是保证消息的不重复,一个是保证消息的可靠性)
 
- Flume是可靠的,容错性高的,可升级的,易管理的,并且可定制的(可以根据生产需要自行定义一个数据来源端或者终点端)
- 除了日志信息,Flume同时也可以用来接入收集规模宏大的社交网络节点事件数据,比如Facebook、Twitter、电商网站如亚马逊等
- 支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等。
10.1.6 Flume执行流程

1 Source 接受数据
2 Channel Processor 处理 Event
3 Channel Processor 将 Event 传递给interceptor链对 Event 进行过滤操作
4 过滤完之后再把 Event 发送回 Channel Prodessor
5 Channel Processor把 Event 发送给Channel selectors
6 Channel selector返回Event 属于哪个Channel
7 根据第6步返回的结果,将Event发送到指定的Channel
8 SinkProcessor从Channel中拉去数据
9 最后把数据Sink出去
10.1.7 Flume事务

- 推送事务流程 
  - doPut: 把批数据写入到临时缓冲区putList
- doCommit: 检查Channel容量是否足够,如果容量足够则把putList里的数据发送Channel
- doRollBack:如果Channel容量不够,则把数据回滚到putList
 
- 拉取事务流程
 doTake:把数据读取到临时缓冲区takeList
 doCommit:检查数据是否发送成功,成功的话,则把event从takeList中移除
 doRollBack:如何发送失败,则把takeList的数据回滚数据到Channel
- 可靠 
  - 只有当sink接收到,数据落地完成的信息之后,才会将数据从通道中删除。
- 事件在每个代理上的一个通道中上游。然后将事件传递到流中的下一个代理或终端存储库(如HDFS)。仅将事件存储在下一个代理程序的通道或终端存储库中之后,才将其从通道中删除。这就是Flume中单跳消息传递语义如何提供流的端到端可靠性的方式。
- 数据传输的方式不是byte,而是一个个的event Flume使用事务性方法来确保事件的可靠传递。源和接收器分别在事务中封装存储在通道中或由通道提供的事务中提供的事件的存储/检索。这确保了事件集在流中从点到点可靠地传递。在多跳流的情况下,来自上一跳的接收器和来自下一跳的源均运行其事务,以确保将数据安全地存储在下一跳的通道中。
 
- 可恢复 
  - 当数据丢失了,只有从存储在磁盘的方式,才能将数据找回 事件在通道中上演,该通道管理从故障中恢复。
- Flume支持持久的文件通道,该通道由本地文件系统支持。还有一个内存通道可以将事件简单地存储在内存队列中,这虽然速度更快,但是当代理进程死亡时,仍保留在内存通道中的任何事件都无法恢复
 
转载:https://blog.csdn.net/weixin_50627985/article/details/125566782
