Flink 的 Table API 和 SQL 提供了多种自定义函数的接口,以抽象类的形式定义。当前 UDF
主要有以下几类:
- 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
- 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
- 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
- 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据;
标量函数(Scalar Functions)
自定义标量函数可以把 0 个、 1 个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换,类似于hive中的UDF。
想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类 ScalarFunction,并实
现叫作 eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有(public),
而且名字必须是 eval。求值方法 eval 可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。
使用场景:求传入对象的哈希值
-
public
static
class
HashFunction
extends
ScalarFunction {
-
// 接受任意类型输入,返回 INT 型输出
-
public
int
eval
(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
-
return o.hashCode();
-
}
-
}
-
// 注册函数
-
tableEnv.createTemporarySystemFunction(
"HashFunction", HashFunction.class);
-
// 在 SQL 里调用注册好的函数
-
tableEnv.sqlQuery(
"SELECT HashFunction(myField) FROM MyTable");
表函数(Table Functions)
表函数的输入参数也可以是 0 个、1 个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系,类似于Hive中的UDTF(窗口TVF本质上就是一个表函数)。让输入表中的每一行,与它转换得到的表进行联结(join),然后再拼成一个完整的大表,这就相当于对原来的表进行了扩展。在 Hive 的 SQL 语法中,提供了“侧向视图”(lateral view,也叫横向视图)的功能,可以将表中的一行数据拆分成多行;Flink SQL 也有类似的功能,是用 LATERAL TABLE 语法来实现的。
实现自定义的表函数,需要自定义类来继承抽象类 TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction 类本身是有一个泛型参数T 的,这就是表函数返回数据的类型;而 eval()方法没有返回类型,内部也没有 return语句,是通过调用 collect()方法来发送想要输出的行数据的。
使用场景:对字段进行拆分,一行变多行。
-
// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length。
-
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
-
public
static
class
SplitFunction
extends
TableFunction<Row> {
-
public
void
eval
(String str) {
-
for (String s : str.split(
" ")) {
-
// 使用 collect()方法发送一行数据
-
collect(Row.of(s, s.length()));
-
}
-
}
-
}
-
// 注册函数
-
tableEnv.createTemporarySystemFunction(
"SplitFunction", SplitFunction.class);
-
// 重命名侧向表中的字段
-
tableEnv.sqlQuery(
-
"SELECT myField, newWord, newLength " +
-
"FROM MyTable " +
-
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");
聚合函数(Aggregate Functions)
用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据
(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换,类似于Hive中的UDAF。聚合函数的概念我们之前已经接触过多次,如 SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。
自定义聚合函数需要继承抽象类 AggregateFunction。AggregateFunction 有两个泛型参数
<T, ACC>,T 表示聚合输出的结果类型,ACC 则表示聚合的中间状态类型,所以要创建一个累加器。
使用场景:计算加权平均数
-
// 累加器类型定义
-
public
static
class
WeightedAvgAccumulator {
-
public
long
sum
=
0;
// 加权和
-
public
int
count
=
0;
// 数据个数
-
}
-
// 自定义聚合函数,输出为长整型的平均值,累加器类型为 WeightedAvgAccumulator
-
public
static
class
WeightedAvg
extends
AggregateFunction<Long,WeightedAvgAccumulator> {
-
@Override
-
public WeightedAvgAccumulator
createAccumulator
() {
-
return
new
WeightedAvgAccumulator();
// 创建累加器
-
}
-
@Override
-
public Long
getValue
(WeightedAvgAccumulator acc) {
-
if (acc.count ==
0) {
-
return
null;
// 防止除数为 0
-
}
else {
-
return acc.sum / acc.count;
// 计算平均值并返回
-
}
-
}
-
// 累加计算方法,每来一行数据都会调用
-
public
void
accumulate
(WeightedAvgAccumulator acc, Long iValue, Integer
-
iWeight) {
-
acc.sum += iValue * iWeight;
-
acc.count += iWeight;
-
}
-
}
-
-
// 注册自定义聚合函数
-
tableEnv.createTemporarySystemFunction(
"WeightedAvg", WeightedAvg.class);
-
// 调用函数计算加权平均值
-
Table
result
= tableEnv.sqlQuery(
"SELECT student, WeightedAvg(score, weight) FROM ScoreTable GROUP BY student");
表聚合函数(Table Aggregate Functions)
用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另 一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。
自定义表聚合函数需要继承抽象类 TableAggregateFunction。TableAggregateFunction结构和原理与 AggregateFunction 非常类似,同样有两个泛型参数<T, ACC>,用一个 ACC 类型的累加器(accumulator)来存储聚合的中间结果。
表聚合函数得到的是一张表;在流处理中做持续查询,应该每次都会把这个表重新计算输出。如果输入一条数据后,只是对结果表里一行或几行进行了更新(Update),这时我们重新计算整个表、全部输出显然就不够高效了。为了提高处理效率,TableAggregateFunction 还提供了一个 emitUpdateWithRetract()方法,它可以在结果表发生变化时,以“撤回”(retract)老数据、发送新数据的方式增量地进行更新。如果同时定义了 emitValue()和 emitUpdateWithRetract()两个方法,在进行更新操作时会优先调用 emitUpdateWithRetract()。
-
// 聚合累加器的类型定义,包含最大的第一和第二两个数据
-
public
static
class
Top2Accumulator {
-
public Integer first;
-
public Integer second;
-
}
-
// 自定义表聚合函数,查询一组数中最大的两个,返回值为(数值,排名)的二元组
-
public
static
class
Top2
extends
TableAggregateFunction<Tuple2<Integer, Integer>,
-
Top2Accumulator> {
-
@Override
-
public Top2Accumulator
createAccumulator
() {
-
Top2Accumulator
acc
=
new
Top2Accumulator();
-
acc.first = Integer.MIN_VALUE;
// 为方便比较,初始值给最小值
-
acc.second = Integer.MIN_VALUE;
-
return acc;
-
}
-
// 每来一个数据调用一次,判断是否更新累加器
-
public
void
accumulate
(Top2Accumulator acc, Integer value) {
-
if (value > acc.first) {
-
acc.second = acc.first;
-
acc.first = value;
-
}
else
if (value > acc.second) {
-
acc.second = value;
-
}
-
}
-
// 输出(数值,排名)的二元组,输出两行数据
-
public
void
emitValue
(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>>
-
out) {
-
if (acc.first != Integer.MIN_VALUE) {
-
out.collect(Tuple2.of(acc.first,
1));
-
}
-
if (acc.second != Integer.MIN_VALUE) {
-
out.collect(Tuple2.of(acc.second,
2));
-
}
-
}
-
}
-
-
// 注册表聚合函数函数
-
tableEnv.createTemporarySystemFunction(
"Top2", Top2.class);
-
// 在 Table API 中调用函数
-
tableEnv.from(
"MyTable")
-
.groupBy($(
"myField"))
-
.flatAggregate(call(
"Top2", $(
"value")).as(
"value",
"rank"))
-
.select($(
"myField"), $(
"value"), $(
"rank"));
目前 SQL 中没有直接使用表聚合函数的方式,所以需要使用 Table API 的方式来调用。这里使用了 flatAggregate()方法,它就是专门用来调用表聚合函数的接口。
转载:https://blog.csdn.net/qq_42456324/article/details/128163589