上篇博客给大家讲解了Flink的入门及dataSource 点击,本篇博客给讲解下Flink的17种常用的算子,本篇博客比较长,耐心看完(注意:面试经常被问到,建议收藏,如要对你有帮助的话麻烦,点赞 关注 评论)。Flink专栏
1、Map
需求: 将 DataSet 中的每一个元素转换为另外一个元素
示例: 使用 map 操作,将以下数据 “1,张三”, “2,李四”, “3,王五”, “4,赵六” 转换为一个 scala 的样例类。
实现步骤:
- 获取 ExecutionEnvironment 运行环境。
- 使用 FromCollection 构建数据源。
- 创建一个 User 样例类
- 使用 Map 将数据转化为样例类
- 打印输出
参考代码
import org.apache.flink.api.scala._
/**
* @author 需求: 使用Map将数据转换成样例类
* @date 2020/9/8 23:26
* @version 1.0
*/
object BatchMap {
case class User(name:String,age:String)
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromElements
val data = env.fromElements("张三,19", "李四,30", "刘恒,299")
//3.使用map将数据封装成样例类
val userDataSet = data.map(s => {
User(s.split(",")(0), s.split(",")(1))
})
//4.将数据输出
userDataSet.print()
}
}
2、FlatMap
需求: 将 DataSet 中的每一个元素转换为 0…n 个元素
实例: 分别将以下数据,转换成 国家 、省份 、城市 三个维度的数据。
将以下数据
张三,中国,江西省,南昌市
李四,中国,河北省,石家庄市
Tom,America,NewYork,Manhattan
转换为
张三,中国
张三,中国江西省
张三,中国江西省南昌市
解题思路
以上数据为一条转换为三条,显然,应当使用 flatMap 来实现 分别在 flatMap 函数中构建三个数据,并放入到一个列表中
显示结果
姓名, 国家
姓名, 国家省份
姓名, 国家省份城市
实现步骤:
- 构建批处理运行环境
- 构建本地集合数据源
- 使用 flatMap 将一条数据转换为三条数据
a. 使用逗号分隔字段
b. 分别构建国家、国家省份、国家省份城市三个元组 - 打印输出
代码实现:
import org.apache.flink.api.scala._
/**
* @author 需求:
* 将"张三,中国,江西省,南昌市",
* "李四,中国,河北省,石家庄市",
* "Tom,America,NewYork,Manhattan"
* 转换为:
* 张三,中国
* 张三,中国江西省
* 张三,中国江西省南昌市
* @date 2020/9/9 0:11
* @version 1.0
*/
object BachFlatMap {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromCollection构建数据集
val dataSource = env.fromCollection(List("张三,中国,江西省,南昌市", "李四,中国,河北省,石家庄市", "Tom,America,NewYork,Manhattan"))
val flatMap: DataSet[((String, String), (String, String), (String, String))] = dataSource.flatMap(line => {
val arr = line.split(",")
List(
((arr(0), arr(1)),
(arr(0), arr(1) + arr(2)),
(arr(0), arr(1) + arr(2) + arr(3))))
})
flatMap.print()
}
}
3、MapPartition
需求: 将一个分区中的元素转换为另一个元素
示例: 使用 mapPartition 操作,将以下数据 “1,张三”, “2,李四”, “3,王五”, “4,赵六” 转换为一个 scala 的样例类。
实现步骤:
- 获取 ExecutionEnvironment 运行环境
- 使用 fromCollection 构建数据源
- 创建一个 User 样例类
- 使用 mapPartition 操作执行转换
- 打印测试
代码实现:
import org.apache.flink.api.scala._
/**
* @author 需求:将一天分区中的数据转换为一个样例类
* "1,张三", "2,李四", "3,王五", "4,赵六"
* @date 2020/9/9 21:57
* @version 1.0
*/
object BachMapPartition {
case class User(id:String,name:String)
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用FromElements构建数据集
val dataSource = env.fromElements("1,张三", "2,李四", "3,王五", "4,赵六")
//3.数据处理
val mapPartitionDS: DataSet[User] = dataSource.mapPartition(textPartition => {
textPartition.map( x => {
val arrs = x.split(",")
User(arrs(0), arrs(1))
})
})
//4.结果输出
mapPartitionDS.print()
}
}
4、Filter
需求: 过滤出来 一些符合条件的元素
Filter作用: Filter 函数在实际生产中特别实用,数据处理阶段可以过滤掉大部分不符合业务的内容, 可以极 大减轻整体 flink 的运算压力
实例: 使用filter过滤掉大于10的数字
实现步骤:
- 获取 ExecutionEnvironment 运行环境
- 使用 fromCollection 构建数据源
- 使用 filter 操作执行过滤
- 打印测试
参考代码
import org.apache.flink.api.scala._
/**
* @author 需求:使用filter过滤掉大于10的数字
* 过滤出来 一些符合条件的元素 Filter 函数在实际生产中特别实用,数据处理阶段可以过滤掉大部分不符合业务的内容,
* 可以极 大减轻整体 flink 的运算压力
* @date 2020/9/9 22:35
* @version 1.0
*/
object BachFilter {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建1-20的数据集
val dataSource = env.generateSequence(1, 20)
//3.处理数据
val filter = dataSource.filter(_ < 10)
//4.结果输出
filter.print()
}
}
5、Reduce
需求: 可以对一个 dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素
实例: 请将以下元组数据,使用 reduce 操作聚合成一个最终结果 (“java” , 1) , (“java”, 1) ,(“java” , 1) 将上传元素数据转换为 (“java”,3)
实现步骤:
- 获取 ExecutionEnvironment 运行环境
- 使用 fromCollection 构建数据源
- 使用 redice 执行聚合操作
- 打印测试
参考代码:
import org.apache.flink.api.scala._
/**
* @author 需求:请将以下元组数据,使用 reduce 操作聚合成一个最终结果
* ("java" , 1) , ("java", 1) ,("java" , 1)
* @date 2020/9/9 22:39
* @version 1.0
*/
object BachReduce {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromElements 构建数据集
val dataSource = env.fromElements(("java", 1), ("java", 1), ("java", 2))
//3.数据处理(根据key进行分组)
val values = dataSource.groupBy(_._1)
//4.使用reduce进行合并
val reduce = values.reduce((v1, v2) => (v1._1, v2._2 + v1._2))
//4.结果输出
reduce.print()
}
}
6、ReduceGroup
可以对一个 dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素 reduce 和 reduceGroup 的 区别
首先 groupBy 函数会将一个个的单词进行分组,分组后的数据被 reduce 一个个的拉 取过来,这种方式如果数据量大的情况下,拉取的数据会非常多,增加了网络 IO。
reduceGroup 是 reduce 的一种优化方案; 它会先分组 reduce,然后在做整体的 reduce;这样做的好处就是可以减少网络 IO。
示例: 请将以下元组数据,下按照单词使用 groupBy 进行分组,再使用 reduceGroup 操作进行单 词计数(“java” , 1) , (“java”, 1) ,(“scala” , 1)
实现步骤:
- 获取 ExecutionEnvironment 运行环境
- 使用 fromCollection 构建数据源
- 使用 groupBy 按照单词进行分组
- 使用 reduceGroup 对每个分组进行统计
- 打印测试
参考代码
import org.apache.flink.api.scala._
/**
* @author 请将以下元组数据,下按照单词使用 groupBy 进行分组,再使用 reduceGroup 操作进行单 词计数("java" , 1) , ("java", 1) ,("scala" , 1)
* @date 2020/9/11 22:15
* @version 1.0
*/
object BachReduceGroup {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据集
val source = env.fromElements(("java", 1), ("Flin", 1), ("大数据", 1), ("java", 2))
//3.使用reduceGroup进行分组求和
val result = source.groupBy(0).reduceGroup(group => (group.reduce((a, b) => (a._1, a._2 + b._2))))
//4.输出
result.print()
}
}
7、Aggregate(重点)
介绍: 按照内置的方式来进行聚合。例如:SUM/MIN/MAX…
示例: 请将以下元组数据,使用 aggregate 操作进行单词统计 (“java”, 1), (“大数据”, 2), (“大数据”, 10)
实现步骤:
- 获取 ExecutionEnvironment 运行环境
- 使用 fromCollection 构建数据源
- 使用 groupBy 按照单词进行分组
- 使用 aggregate 对每个分组进行 SUM 统计
- 打印测试
参考代码
import org.apache.flink.api.scala._
/**
* @author 请将以下元组数据,使用 aggregate 操作进行单词统计 ("java", 1), ("大数据", 2), ("大数据", 10)
* @date 2020/9/11 22:30
* @version 1.0
*/
object BachAggregate {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromElements构建数据集
val sourceData = env.fromElements(("java", 1), ("大数据", 2), ("大数据", 10))
//3.使用groupBy进行分组然后使用aggregate求出最大值
val result = sourceData.groupBy(0).aggregate(Aggregations.MAX, 1)
//4.结果输出
result.print()
}
}
8、minBy 和 maxBy
介绍: 获取指定字段的最大值、最小值
示例: 请将以下元组数据,使用 aggregate 操作进行单词统计 (1, “yuwen”, 89.0) , (2, “shuxue”, 92.2),(3, “yingyu”, 89.99),(4, “wuli”, 98.9), (1, “yuwen”, 88.88),(1, “wuli”, 93.00),(1, “yuwen”, 94.3)
实现步骤:
- 获取 ExecutionEnvironment 运行环境
- 使用 fromCollection 构建数据源
- 使用 groupBy 按照单词进行分组
- 使用 maxBy、minBy对每个分组进行操作
- 打印测试
参考代码:
import org.apache.flink.api.scala._
import scala.collection.mutable
import scala.util.Random
/**
* @author
* @date 2020/9/11 22:40
* @version 1.0
*/
object BachMinByAndMaxBy {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据集
val data = new mutable.MutableList[(Int, String, Double)]
data += ((1, "liuheng", 89.0))
data += ((2, "shuxue", 92.2))
data += ((3, "yingyu", 89.99))
data += ((4, "wuli", 98.9))
data += ((1, "yuwen", 88.88))
data += ((1, "wuli", 93.00))
data += ((1, "yuwen", 94.3))
val sourceData = env.fromCollection(Random.shuffle(data))
//3.使用MinBy求出最小值与MaxBy求出最大值
val min = sourceData.groupBy(1).minBy(2)
val max = sourceData.groupBy(1).maxBy(2)
//4.输出最小值
min.print()
println("-----------------------------")
//5.输出最大值
max.print()
}
}
9、Distinct
介绍: 去除重复的数据
示例: 请将以下元组数据,使用 distinct 操作去除重复的单词 (“java” , 1) , (“java”, 1) ,(“scala” , 1) 去重得到 (“java”, 1), (“scala”, 1)
实现步骤:
- 获取 ExecutionEnvironment 运行环境
- 使用 fromCollection 构建数据源
- 使用 distinct 指定按照哪个字段来进行去重
- 打印测试
参考代码:
import org.apache.flink.api.scala._
/**
* @author 需求:使用distinct求("java", 1), ("java", 2), ("scala", 1) 去掉重复的数据
* @date 2020/9/12 22:56
* @version 1.0
*/
object BachDistinct {
def main(args: Array[String]): Unit = {
//1.构建运行环境(上下文对象)
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromElements
val dataSource = env.fromElements(("java", 1), ("java", 2), ("scala", 1))
//3.使用distinct去掉重复的
val distinct = dataSource.distinct(0)
//4.结果输出
distinct.print()
}
}
10、Join
介绍: 使用 join 可以将两个 DataSet 连接起来
示例: 有两个 csv 文件,有一个为 score.csv ,一个为 subject.csv ,分 别保存了成绩数据以及学科数据
sorce.csv
1,语数
2,英物
3,化生
4,文学
5,语理
6,学物
subject.csv
1,张三,1,98
2,张三,2,77.5
3,张三,3,89
4,张三,4,65
5,张三,5,78
6,张三,6,70
9,李四,3,65
10,李四,4,78
11,李四,5,70
12,李四,6,78
13,王五,1,70
14,王五,2,78
实现步骤:
- 分别将资料中的两个文件复制到项目中的 data/ 中
- 构建批处理环境
- 创建两个样例类
a. 学科 Subject(学科 ID、学科名字)
b. 成绩 Score(唯一 ID、学生姓名、学科 ID、分数——Double 类型) - 分别使用 readCsvFile 加载 csv 数据源,并制定泛型
- 使用 join 连接两个 DataSet,并使用 where 、 equalTo 方法设置关联条件
- 打印关联后的数据源
参开代码:
import org.apache.flink.api.scala._
/**
* @author 需求:使用join的方式将sorce.csv文件与subject.csv文件进行关联
* @date 2020/9/12 23:38
* @version 1.0
*/
object BachJoin {
//构建样例类
case class sorce(id:String,subject:String)
case class subject(id:String,name:String,sid:String,source:String)
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据集(使用文件方式)
val sorce = env.readCsvFile[sorce]("./data/score.csv")
val subject = env.readCsvFile[subject]("./data/subject.csv")
//3.使用join将两个文件中的数据进行关联
val joinData = sorce.join(subject).where(_.id).equalTo(_.sid)
//4.结果输出
joinData.print()
}
}
11、LeftOuterJoin
介绍: 左外连接,左边的 Dataset 中的每一个元素,去连接右边的元素
示例: 请将以下元组数据
(用户 id,用户姓名)
(1, “zhangsan”) ,
(2, “lisi”) ,
(3 , “wangwu”) ,
(4 , “zhaoliu”)
元组数据
(用户 id,所在城市)
(1, “beijing”),
(2, “shanghai”),
(4, “guangzhou”)
返回如下数据:
(3,wangwu,null)
(1,zhangsan,beijing)
(2,lisi,shanghai)
(4,zhaoliu,guangzhou)
参考代码
import scala.collection.mutable.ListBuffer
/**
* @author 需求:使用左连接 请将以下元组数据(用户 id,用户姓名)
* (1, "zhangsan") ,
* (2, "lisi") ,
* (3 , "wangwu") ,
* (4 , "zhaoliu")
* 元组数据
* (用户 id,所在城市)
* (1, "beijing"),
* (2, "shanghai"),
* (4, "guangzhou")
* 返回如下数据:
* (3,wangwu,null)
* (1,zhangsan,beijing)
* (2,lisi,shanghai)
* (4,zhaoliu,guangzhou)
* @date 2020/9/15 23:30
* @version 1.0
*/
object BachLeftOuterJoin {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.创建要测试的数据集
val data1 = ListBuffer[Tuple2[Int, String]]()
data1.append((1, "zhangsan"))
data1.append((2, "lisi"))
data1.append((3, "wangwu"))
data1.append((4, "zhaoliu"))
val data2 = ListBuffer[Tuple2[Int, String]]()
data2.append((1, "beijing"))
data2.append((2, "shanghai"))
data2.append((4, "guangzhou"))
//2.2 使用fromCollection构建数据集
val test1 = env.fromCollection(data1)
val test2 = env.fromCollection(data2)
//3.使用leftOuterJoin 进行关联
val result = test1.leftOuterJoin(test2).where(0).equalTo(0).apply((first, second)=>{
if (second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
})
//4.结果输出
result.print()
}
}
12、RightOuterJoin
实例: 右外连接,左边的 Dataset 中的每一个元素,去连接左边的元素
示例: 请将以下元组数据
(用户 id,用户姓名)
(1, “zhangsan”) ,
(2, “lisi”) ,
(3 , “wangwu”) ,
(4 , “zhaoliu”)
元组数据
(用户 id,所在城市)
(1, “beijing”),
(2, “shanghai”),
(4, “guangzhou”)
返回如下数据:
(1,zhangsan,beijing)
(2,lisi,shanghai)
(4,zhaoliu,guangzhou)
参考代码
import org.apache.flink.api.scala._
import scala.collection.mutable.ListBuffer
/**
* @author 需求:使用左连接 请将以下元组数据(用户 id,用户姓名)
* (1, "zhangsan") ,
* (2, "lisi") ,
* (3 , "wangwu") ,
* (4 , "zhaoliu")
* 元组数据
* (用户 id,所在城市)
* (1, "beijing"),
* (2, "shanghai"),
* (4, "guangzhou")
* 返回如下数据:
* (1,zhangsan,beijing)
* (4,zhaoliu,guangzhou)
* (2,lisi,shanghai)
* @date 2020/9/15 23:30
* @version 1.0
*/
object BachRightOuterJoin {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.创建要测试的数据集
val data1 = ListBuffer[Tuple2[Int, String]]()
data1.append((1, "zhangsan"))
data1.append((2, "lisi"))
data1.append((3, "wangwu"))
data1.append((4, "zhaoliu"))
val data2 = ListBuffer[Tuple2[Int, String]]()
data2.append((1, "beijing"))
data2.append((2, "shanghai"))
data2.append((4, "guangzhou"))
//2.2 使用fromCollection构建数据集
val test1 = env.fromCollection(data1)
val test2 = env.fromCollection(data2)
//3.使用rightOuterJoin 进行关联
val result = test1.rightOuterJoin(test2).where(0).equalTo(0).apply((first, second)=>{
if (second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
})
//4.结果输出
result.print()
}
}
13、fullOuterJoin
介绍: 全外连接,左右两边的元素,全部连接
示例: 请将以下元组数据
(用户 id,用户姓名)
(1, “zhangsan”) ,
(2, “lisi”) ,
(3 , “wangwu”) ,
(4 , “zhaoliu”)
元组数据
(用户 id,所在城市)
(1, “beijing”),
(2, “shanghai”),
(4, “guangzhou”)
返回如下数据:
(3,wangwu,null)
(1,zhangsan,beijing)
(2,lisi,shanghai)
(4,zhaoliu,guangzhou)
扩展:
- OPTIMIZER_CHOOSES: 将选择权交予Flink优化器,相当于没有给提示;
- BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第 二个输入端作为探索端,选择这种策略的场景第一个输入端规模很小;
- BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端 作为探索端,选择这种策略的场景是第二个输入端的规模很小;
- REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第 一个输入端构建哈希表。该策略适用于第一个输入端数据量小于第二个输入端的数据量,但这 两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已 存在的分区以及排序 顺序可被使用时系统默认采用的策略;
- REPARTITION_HASH_SECOND: 该策略会导致两个输入端都会被重分区,但会基于 第二个输入端构建哈希表。该策略适用于两个输入端的规模都很大,但第二个输入端的数据量 小于第一个输入端的情况;
- REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。 该策略适用于一个或两个输入端都已 排过序的情况;
参考代码:
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.scala._
import scala.collection.mutable.ListBuffer
/**
* @author 需求:
* 请将以下元组数据(用户 id,用户姓名)
* (1, "zhangsan") ,
* (2, "lisi") ,
* (3 , "wangwu") ,
* (4 , "zhaoliu")
* 元组数据(用户 id,所在城市)
* (1, "beijing"),
* (2, "shanghai"),
* (4, "guangzhou")
* 返回如下数据:
* (3,wangwu,null)
* (1,zhangsan,beijing)
* (2,lisi,shanghai)
* (4,zhaoliu,guangzhou)
* @date 2020/9/15 23:43
* @version 1.0
*/
object BachFullOuterJoin {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.创建要测试的数据集
val data1 = ListBuffer[Tuple2[Int, String]]()
data1.append((1, "zhangsan"))
data1.append((2, "lisi"))
data1.append((3, "wangwu"))
data1.append((4, "zhaoliu"))
val data2 = ListBuffer[Tuple2[Int, String]]()
data2.append((1, "beijing"))
data2.append((2, "shanghai"))
data2.append((4, "guangzhou"))
//2.2 使用fromCollection构建数据集
val test1 = env.fromCollection(data1)
val test2 = env.fromCollection(data2)
//3.使用fullOuterJoin 进行关联
val result = test1.fullOuterJoin(test2,JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0).apply((first, second)=>{
if (first==null){
(second._1,"null",second._2)
} else if (second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
})
//4.结果输出
result.print()
}
}
14、cross
介绍: 和 join 类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存 的操作;
示例: 请将以下元组数据 (1, 4, 7), (2, 5, 8), (3, 6, 9)
元组数据 (10, 40, 70), (20, 50, 80), (30, 60, 90)
进行笛卡尔积,返回如下数据:
Buffer(((1,4,7),(10,40,70)), ((1,4,7),(20,50,80)), ((1,4,7),(30,60,90)), ((2,5,8),(10,40,70)), ((2,5,8),(20,50,80)), ((2,5,8),(30,60,90)), ((3,6,9),(10,40,70)), ((3,6,9),(20,50,80)), ((3,6,9),(30,60,90)))
参考代码:
import org.apache.flink.api.scala._
/**
* @author
* @date 2020/9/15 23:50
* @version 1.0
*/
object BatchCross {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
println("==============================cross=======================================")
cross(env)
println("==============================crossWithTiny=======================================")
crossWithTiny(env)
println("==============================crossWithHuge=======================================")
crossWithHuge(env)
}
/**
* 笛卡尔集
*
* @param env
*/
def cross(env: ExecutionEnvironment) = {
//1.使用 fromElements定义两个dataSet
val data1 = env.fromElements((1, 4, 7), (2, 5, 8), (3, 6, 9))
val data2 = env.fromElements((10, 40, 70), (20, 50, 80), (30, 60, 90))
val result = data1.cross(data2)
println(result.collect())
}
/**
* 暗示第二个输入较小的交叉
*
* @param env
*/
def crossWithTiny(env: ExecutionEnvironment) = {
//1.定义 case class
case class Coord(id: Int, x: Int, y: Int)
val data1: DataSet[Coord] = env.fromElements( Coord(2, 5, 8), Coord(1, 4, 7),Coord(3, 6, 9))
val data2: DataSet[Coord] = env.fromElements( Coord(20, 50, 80),Coord(10, 40, 70), Coord(30, 60, 90))
val result = data1.crossWithTiny(data2)
result.print()
}
def crossWithHuge(env: ExecutionEnvironment) = {
//1.定义 case class
case class Coord(id: Int, x: Int, y: Int)
val data1: DataSet[Coord] = env.fromElements(Coord(1, 4, 7), Coord(2, 5, 8), Coord(3, 6, 9))
val data2: DataSet[Coord] = env.fromElements(Coord(10, 40, 70), Coord(20, 50, 80), Coord(30, 60, 90))
val result = data1.crossWithHuge(data2)
result.print()
}
}
15、Union
介绍: 将多个 DataSet 合并成一个 DataSet【注意】:union 合并的 DataSet 的类型必须是一致 的
示例:
将以下数据进行取并集操作
数据集
1 “hadoop”, “hive”, “flume”
数据集 2
“hadoop”, “hive”, “spark”
实现步骤:
- 构建批处理运行环境
- 使用 fromCollection 创建两个数据源
- 使用 union 将两个数据源关联在一起
- 打印测试
参考代码:
import org.apache.flink.api.scala._
/**
* @author 需求:
* 将以下数据进行取并集操作
* 数据集
* 1 "hadoop", "hive", "flume"
* 数据集
* 2 "hadoop", "hive", "spark
* @date 2020/9/16 0:05
* @version 1.0
*/
object BachUnion {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据集
val data1 = env.fromCollection(List("hadoop", "hive", "flume"))
val data2 = env.fromCollection(List("hadoop", "hive", "spark"))
val result = data1.union(data2)
result.print()
}
}
16、Rebalance(重点)
介绍:
Flink 也有数据倾斜的时候,比如当前有数据量大概 10 亿条数据需要处理,在处理过程中 可能会 发生如图所示的状况:
这个时候本来总体数据量只需要 10 分钟解决的问题,出现了数据倾斜,机器 1 上的 任务需 要 4 个小时才能完成,那么其他 3 台机器执行完毕也要等待机器 1 执行完毕后才 算整体将任务完成;所以在实际的工作中,出现这种情况比较好的解决方案就是—rebalance(内 部使用 round robin 方法将数据均匀打散。这对于数据倾斜时是很 好的选择。)
实现步骤:
- 构建批处理运行环境
- 使用 env.generateSequence 创建 0-100 的并行数据
- 使用 fiter 过滤出来 大于 8 的数字
- 使用 map 操作传入 RichMapFunction ,将当前子任务的 ID 和数字构建成一个元组
- 在 RichMapFunction 中可以使用 getRuntimeContext.getIndexOfThisSubtask 获取子 任务序号
- 打印测试
代码实现:
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
/**
* @author 对数据集进行再平衡,重分区,消除数据倾斜
* @date 2020/9/16 0:21
* @version 1.0
*/
object BatchRebalance {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据集
val data = env.generateSequence(0, 100)
val ds = data.filter(_ > 8)
//3.对数据进行在平衡操作
val value1 = ds.rebalance().map(new RichMapFunction[Long, (Int, Long)] {
override def map(value: Long): (Int, Long) = {
(getRuntimeContext.getIndexOfThisSubtask, value)
}
})
//4.结果输出
value1.print()
}
}
17、First(重点)
介绍: 根据给定的 key 对一个数据集取前 N 条数据(往往在公司中是经常用到了,比如头条中的热搜Top10)
实现步骤:
- 构建批处理运行环境
- 使用 fromCollection 构建测试数据集
- 使用 first 获取前 N 条数据
- 打印测试
参考代码:
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala._
import scala.collection.mutable.ListBuffer
/**
* @author 需求:根据给定的 key 对一个数据集取前 N 条数据
* @date 2020/9/16 19:07
* @version 1.0
*/
object BachFirst {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据集
val data = ListBuffer[Tuple2[Int,String]]()
data.append((2,"zs"))
data.append((4,"ls"))
data.append((3,"ww"))
data.append((1,"xw"))
data.append((1,"aw"))
data.append((1,"mw"))
val text = env.fromCollection(data)
//3.使用first去前三条数据
val first = text.first(3)
val sortFirst = text.sortPartition(0, Order.ASCENDING).sortPartition(1, Order.DESCENDING).first(3)
//4.结果数据
first.print()
sortFirst.print()
}
}
转载:https://blog.csdn.net/qq_43791724/article/details/108628216