飞道的博客

flink为会什么成为下一代数据处理框架--大数据面试

482人阅读  评论(0)

相对于传统的数据处理模式。流式数据处理则有更高的处理效率和成本控制。apache flink 就是近年来在开源社区发展不断发展能够支持同时支持高吞吐,低延迟,高性能分布式处理框架。

了解一个完整的apache flink sql job的组成部分,以及flink sql所提供的核心算子的语义,最后会应用tumblewindow编写一个end to end 的页面访问统计示例。

apache flink sql job的组成

我们做任何计算都离不读取原始数据,计算逻辑和写入计算结果数据三部分,当然基于apache flink sql 编写的计算job也离不开这个三部分。如下所示:

如上所示,一个完整的 Apache Flink SQL Job 由如下三部分:

source operator -- source operator 是对外部数据源的抽象,目前apache flink内置很多常用的数据源实现,比如上图提到的kafka

query operators 查询算子主要完成如图query logic,目前支持union join 

projection difference ,intersection 以及windows 等大多数传统数据库支持的操作。

sink operator - sink operator 是对外结果的抽象,目前apache flink 也内置了很多常用的结果表的抽象,如上图提到的kafka

apache flink Sql 核心算子

sql 是structuredquery language 的缩写,最初

2.1 select 

select 用于从数据集/流中选择数据,对关系进行垂直分割,消去这些列。

一个使用select 的语句如下:

select cola,colc from tab

2.2 where 

where 用于从数据集/流中过滤数据,与select 一起使用,语法遵循ansi-sql 标准,语义关系代数的selection,根据某些条件对关系做水平分割,即选择符合条件的记录。

2.3 group by 

group by 是对数据进行分组的操作,比如我需要分别计算下一个学生表里面女生和男生人数分别是多少

2.7.1 OverWindow

 

OVER Window 目前支持由如下三个元素组合的 8 种类型:

 

  • 时间 – ProcessingTime 和 EventTime 

  • 数据集 – Bounded 和 UnBounded

  • 划分方式 – ROWS 和 RANGE 我们以的Bounded ROWS 和 Bounded RANGE 两种常用类型,想大家介绍 Over Window 的语义

 

  • Bounded ROWS Over Window

 

Bounded ROWS OVER Window 每一行元素都视为新的计算行,即,每一行都是一个新的窗口。

 

语法

 


  
  1. SELECT
  2. agg1(col1) OVER(
  3. [ PARTITION BY (value_expression1,..., value_expressionN)]
  4. ORDER BY timeCol
  5. ROWS
  6. BETWEEN ( UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName,
  7. ...
  8. FROM Tab1

 

  • value_expression – 进行分区的字表达式;

  • timeCol – 用于元素排序的时间字段;

  • rowCount – 是定义根据当前行开始向前追溯几行元素;

 

语义

 

我们以 3 个元素(2PRECEDING)的窗口为例,如下图:

 

 

上图所示窗口 user 1 的 w5 和 w6, user 2 的 窗口 w2 和 w3,虽然有元素都是同一时刻到达,但是他们仍然是在不同的窗口,这一点有别于 RANGEOVER Window.

 

  • Bounded RANGE Over Window

 

Bounded RANGE OVER Window 具有相同时间值的所有元素行视为同一计算行,即,具有相同时间值的所有行都是同一个窗口;

 

语法

 

Bounded RANGE OVER Window 的语法如下:

 


  
  1. SELECT
  2. agg1(col1) OVER(
  3. [ PARTITION BY (value_expression1,..., value_expressionN)]
  4. ORDER BY timeCol
  5. RANGE
  6. BETWEEN ( UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
  7. ...
  8. FROM Tab1

 

  • value_expression – 进行分区的字表达式; 

  • timeCol – 用于元素排序的时间字段;

  • timeInterval – 是定义根据当前行开始向前追溯指定时间的元素行;

 

语义

 

我们以 3 秒中数据(INTERVAL‘2’ SECOND)的窗口为例,如下图:

 

 

 

注意: 上图所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一时刻到达,他们是在同一个窗口,这一点有别于 ROWS OVER Window.

 

2.7.2 GroupWindow

 

根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种 Bounded Winodw:

 

  • Tumble – 滚动窗口,窗口数据有固定的大小,窗口数据无叠加;

  • Hop – 滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;

  • Session – 会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加;

 

说明:Aapche Flink 还支持 UnBounded的 Group Window,也就是全局 Window,流上所有数据都在一个窗口里面,语义非常简单,这里不做详细介绍了。

 

GroupWindow 的语法如下:

 


  
  1. SELECT
  2. [gk],
  3. agg1(col1),
  4. ...
  5. aggN(colN)
  6. FROM Tab1
  7. GROUP BY [ WINDOW(definition)], [gk]

 

[WINDOW(definition)] – 在具体窗口语义介绍中介绍。

 

  • Tumble Window

 

Tumble 滚动窗口有固定 size,窗口数据不重叠,具体语义如下:

 

假设我们要写一个 2 分钟大小的 Tumble,示例SQL如下:

 


  
  1. SELECT gk, COUNT(*) AS pv
  2. FROM tab
  3. GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE), gk

 

  • Hop Window

 

Hop 滑动窗口和滚动窗口类似,窗口有固定的 size,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的新建频率。因此当 slide 值小于窗口 size 的值的时候多个滑动窗口会重叠,具体语义如下:

 

假设我们要写一个统计连续的两个访问用户之间的访问时间间隔不超过 3 分钟的的页面访问量(PV).

 


  
  1. SELECT gk, COUNT(*) AS pv
  2. FROM tab
  3. GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), gk

 

  • Session Window

 

Session 会话窗口 是没有固定大小的窗口,通过 session 的活跃度分组元素。不同于滚动窗口和滑动窗口,会话窗口不重叠,也没有固定的起止时间。一个会话窗口在一段时间内没有接收到元素时,即当出现非活跃间隙时关闭。一个会话窗口 分配器通过配置 session gap 来指定非活跃周期的时长,具体语义如下:


 

假设我们要写一个统计连续的两个访问用户之间的访问时间间隔不超过 3 分钟的的页面访问量(PV).

 


  
  1. SELECT gk, COUNT(*) AS pv
  2. FROM pageAccessSession_tab
  3. GROUP BY SESSION(rowtime, INTERVAL '3' MINUTE), gk

 

说明:很多场景用户需要获得 Window 的开始和结束时间,上面的 GroupWindow的SQL 示例中没有体现,那么窗口的开始和结束时间应该怎样获取呢? Apache Flink 我们提供了如下辅助函数:

 

  • TUMBLE_START/TUMBLE_END

  • HOP_START/HOP_END

  • SESSION_START/SESSION_END

 

这些辅助函数如何使用,请参考如下完整示例的使用方式。

 上面我们介绍了 Apache Flink SQL 核心算子的语法及语义,这部分将选取Bounded EventTime Tumble Window 为例为大家编写一个完整的包括 Source 和 Sink 定义的 ApacheFlink SQL Job。假设有一张淘宝页面访问表(PageAccess_tab),有地域,用户 ID 和访问时间。我们需要按不同地域统计每 2 分钟的淘宝首页的访问量(PV). 具体数据如下:

3.1 Source 定义

 

自定义 Apache Flink Stream Source 需要实现 StreamTableSource, StreamTableSource 中通过 StreamExecutionEnvironment 的 addSource 方法获取 DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生 WaterMark,也就是要实现 DefinedRowtimeAttributes 接口。出于代码篇幅问题,我们如下只介绍核心部分,完整代码 请查看: EventTimeTumbleWindowDemo.scala

3.1.1 Source Function 定义

 

支持接收携带 EventTime 的数据集合,Either 的数据结构 Right 是 WaterMark,Left 是元数据:

 


  
  1. class MySourceFunction[T](dataList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  2. override def run(ctx: SourceContext[ T]): Unit = {
  3. dataList.foreach {
  4. case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
  5. case Right(w) => ctx.emitWatermark( new Watermark(w)) // emit watermark
  6. }
  7. }
  8. }
 

3.1.2 定义 StreamTableSource

 

我们自定义的 Source 要携带我们测试的数据,以及对应的 WaterMark 数据,具体如下:

 


  
  1. class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
  2. // 页面访问表数据 rows with timestamps and watermarks
  3. val data = Seq(
  4. // Data
  5. Left( 1510365660000L, Row.of( new JLong( 1510365660000L), "ShangHai", "U0010")),
  6. // Watermark
  7. Right( 1510365660000L),
  8. ..
  9. )
  10. val fieldNames = Array( "accessTime", "region", "userId")
  11. val schema = new TableSchema(fieldNames, Array( Types. SQL_TIMESTAMP, Types. STRING, Types. STRING))
  12. val rowType = new RowTypeInfo(
  13. Array( Types. LONG, Types. STRING, Types. STRING).asInstanceOf[ Array[ TypeInformation[_]]],
  14. fieldNames)
  15. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[ Row] = {
  16. // 添加数据源实现
  17. execEnv.addSource( new MySourceFunction[ Row](data)).setParallelism( 1).returns(rowType)
  18. }
  19. ...
  20. }
 

3.4 Sink 定义

 

我们简单的将计算结果写入到 Apache Flink 内置支持的 CSVSink 中,定义 Sink 如下:

 


  
  1. def getCsvTableSink: TableSink[Row] = {
  2. val tempFile = ...
  3. new CsvTableSink(tempFile.getAbsolutePath).configure(
  4. Array[ String]( "region", "winStart", "winEnd", "pv"),
  5. Array[TypeInformation[_]](Types. STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
  6. }
 

 

3.5 构建主程序

 

主程序包括执行环境的定义,Source / Sink 的注册以及统计查 SQL 的执行,具体如下:

 


  
  1. def main(args: Array[ String]): Unit = {
  2. // Streaming 环境
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment
  4. val tEnv = TableEnvironment.getTableEnvironment(env)
  5. // 设置EventTime
  6. env.setStreamTimeCharacteristic( TimeCharacteristic. EventTime)
  7. //方便我们查出输出数据
  8. env.setParallelism( 1)
  9. val sourceTableName = "mySource"
  10. // 创建自定义source数据结构
  11. val tableSource = new MyTableSource
  12. val sinkTableName = "csvSink"
  13. // 创建CSV sink 数据结构
  14. val tableSink = getCsvTableSink
  15. // 注册source
  16. tEnv.registerTableSource(sourceTableName, tableSource)
  17. // 注册sink
  18. tEnv.registerTableSink(sinkTableName, tableSink)
  19. val sql =
  20. "SELECT " +
  21. " region, " +
  22. " TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
  23. " TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
  24. " FROM mySource " +
  25. " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"
  26. tEnv.sqlQuery(sql).insertInto(sinkTableName);
  27. env.execute()
  28. }

 

3.6 执行并查看运行结果

 

执行主程序后我们会在控制台得到 Sink 的文件路径,如下:

 

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

 

Cat 方式查看计算结果,如下:

 


  
  1. jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/ 88/ 8n 406qmx 2z 73qvrzc_rbtv_r 0000gn/T/csv_sink_ 8025014910735142911tem
  2. ShangHai, 2017- 11- 11 02: 00: 00. 0, 2017- 11- 11 02: 02: 00. 0, 1
  3. BeiJing, 2017- 11- 11 02: 00: 00. 0, 2017- 11- 11 02: 02: 00. 0, 1
  4. BeiJing, 2017- 11- 11 02: 10: 00. 0, 2017- 11- 11 02: 12: 00. 0, 2
  5. ShangHai, 2017- 11- 11 04: 10: 00. 0, 2017- 11- 11 04: 12: 00. 0, 1

转载:https://blog.csdn.net/wangjunji34478/article/details/117407747
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场