
文章目录
Presto自定义函数和JDBC连接
一、Presto 自定义函数
我们可以登录Presto客户端,使用命令:show functions 来查询对应的内置函数。我们也可以自己定义函数,自定义的函数包含UDF和UDAF函数。
1、UDF函数
自定义UDF函数及使用可以按照下面步骤来实现。
1.1、创建Maven项目,加入如下依赖
-
<dependency>
-
<groupId>com.facebook.presto
</groupId>
-
<artifactId>presto-spi
</artifactId>
-
<version>0.259
</version>
-
</dependency>
-
<dependency>
-
<groupId>com.facebook.presto
</groupId>
-
<artifactId>presto-array
</artifactId>
-
<version>0.259
</version>
-
</dependency>
-
<dependency>
-
<groupId>io.airlift
</groupId>
-
<artifactId>stats
</artifactId>
-
<version>0.163
</version>
-
</dependency>
-
-
<build>
-
<plugins>
-
-
<plugin>
-
<artifactId>maven-assembly-plugin
</artifactId>
-
<version>2.4
</version>
-
<configuration>
-
<!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
-
<!--<appendAssemblyId>false</appendAssemblyId>-->
-
<descriptorRefs>
-
<descriptorRef>jar-with-dependencies
</descriptorRef>
-
</descriptorRefs>
-
<archive>
-
<manifest>
-
<mainClass>com.lw.java.myflink.Streaming.example.FlinkReadSocketData
</mainClass>
-
</manifest>
-
</archive>
-
</configuration>
-
<executions>
-
<execution>
-
<id>make-assembly
</id>
-
<phase>package
</phase>
-
<goals>
-
<goal>assembly
</goal>
-
</goals>
-
</execution>
-
</executions>
-
</plugin>
-
</plugins>
-
</build>
1.2、创建Presto注册插件类
-
package com.lansonjy.prestocode;
-
import com.facebook.presto.spi.Plugin;
-
import com.google.common.collect.ImmutableSet;
-
-
import java.util.Set;
-
//Presto 注册自定义函数的类,此类需要继承Plugin接口
-
public
class
MyFunctionsPlugin
implements
Plugin {
-
@Override
-
public Set<Class<?>> getFunctions()
-
{
-
return ImmutableSet.<Class<?>>builder()
-
//注册UDF,这里填写对应的UDF类
-
.add(MyUDF.class)
-
.build();
-
}
-
}
1.3、创建“MyUDF”类,实现自定义UDF逻辑
这里自定义的UDF函数实现大写字母转换成小写字母。代码如下:
-
package com.lansonjy.prestocode;
-
-
import com.facebook.presto.spi.function.Description;
-
import com.facebook.presto.spi.function.ScalarFunction;
-
import com.facebook.presto.spi.function.SqlType;
-
import com.facebook.presto.spi.type.StandardTypes;
-
import io.airlift.slice.Slice;
-
import io.airlift.slice.Slices;
-
-
//自定义UDF函数
-
public
class
MyUDF {
-
//自定义UDF函数使用时的名称
-
@ScalarFunction("myudf")
-
//函数的描述
-
@Description("转换字母大写为小写")
-
//指定函数的返回类型,字符串类型必须返回Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
-
@SqlType(StandardTypes.VARCHAR)
-
public
static Slice
lowercase
(@SqlType(StandardTypes.VARCHAR) Slice in)
-
{
-
String
argument
= in.toStringUtf8();
-
return Slices.utf8Slice(argument.toLowerCase());
-
}
-
}
1.4、创建“resources”资源目录
在resouces资源目录中创建“META-INF/services”多级目录,在目录中创建“com.facebook.presto.spi.Plugin”配置文件,Presto将会根据此配置文件找到对应的注册自定义函数类。在此文件中需要指定注册自定义函数的类:
com.lansonjy.prestocode.MyFunctionsPlugin
1.5、将项目打包,上传到Presto集群
将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。
1.6、使用自定义UDF函数
-
#登录Presto客户端
-
./presto --server node3:8080 --catalog mysql --schema presto_db
-
-
#查询所有函数
-
presto:presto_db> show
functions;

-
#使用这个函数查询转换数据
-
presto:presto_db> select myudf(
'ABCDEF');
-
_col0
-
--------
-
abcdef
-
(1 row)
2、UDAF函数
UDAF是自定义聚合函数,下面自定义一个UDAF实现计算平均数聚合函数功能,步骤如下:
2.1、在项目中创建“MyUDAF”类
-
package com.lansonjy.prestocode;
-
-
import com.facebook.presto.spi.block.BlockBuilder;
-
import com.facebook.presto.spi.function.*;
-
import com.facebook.presto.spi.type.DoubleType;
-
import com.facebook.presto.spi.type.StandardTypes;
-
-
//presto 自定义聚合函数实现-实现平均数计算
-
//自定义聚合函数使用时的名称
-
@AggregationFunction("myudaf")
-
//自定义聚合函数注释
-
@Description("我的自定义聚合函数,实现计算平均数")
-
public
class
MyUDAF {
-
//输入数据注释
-
@InputFunction
-
public
static
void
input
(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value) {
-
//针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。
-
state.setLong(state.getLong() +
1);
-
state.setDouble(state.getDouble() + value);
-
}
-
-
//聚合数据注释
-
@CombineFunction
-
public
static
void
combine
(LongAndDoubleState state, LongAndDoubleState otherState) {
-
//将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。
-
state.setLong(state.getLong() + otherState.getLong());
-
state.setDouble(state.getDouble() + otherState.getDouble());
-
}
-
-
//输出数据注释
-
@OutputFunction(StandardTypes.DOUBLE)
-
public
static
void
output
(LongAndDoubleState state, BlockBuilder out) {
-
//最终输出结果到一个 BlockBuilder。
-
long
count
= state.getLong();
-
if (count ==
0) {
-
out.appendNull();
-
}
else {
-
double
value
= state.getDouble();
-
DoubleType.DOUBLE.writeDouble(out, value / count);
-
}
-
}
-
}
以上类中涉及到了自定义类型LongAndDoubelState接口实现,此接口继承了AccumulatorState接口,对于简单的计算逻辑,只是获取设置值,那么可以定义简单接口来实现,里面只需要实现对应的get,set方法实现即可。对于复杂的计算逻辑需要自定义类实现接口,实现复杂的计算逻辑,代码如下:
-
package com.lansonjy.prestocode;
-
-
import com.facebook.presto.spi.function.AccumulatorState;
-
-
public
interface
LongAndDoubleState
extends
AccumulatorState {
-
long
getLong
();
-
-
void
setLong
(long value);
-
-
double
getDouble
();
-
-
void
setDouble
(double value);
-
}
2.2、在“MyFunctionPlugin”中注册UDAF
-
//Presto 注册自定义函数的类,此类需要继承Plugin接口
-
public
class
MyFunctionsPlugin
implements
Plugin {
-
@Override
-
public Set<Class<?>> getFunctions()
-
{
-
return ImmutableSet.<Class<?>>builder()
-
//注册UDF,这里填写对应的UDF类
-
.add(MyUDF.class)
-
//注册UDAF,这里填写对应的UDAF 类
-
.add(MyUDAF.class)
-
.build();
-
}
-
}
2.3、打包,上传到各个Presto
将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。
2.4、在presto中执行如下命令
-
#登录Presto客户端
-
[root@node3 presto-0.259]
# ./presto --server node3:8080 --catalog mysql --schema presto_db
-
-
#查看函数
-
presto:presto_db> show
functions;

-
#执行聚合查询
-
presto:presto_db> select pkg_name,myudaf(amount) as abc from machine_consume_detail group by pkg_name;

二、Presto JDBC连接
使用JDBC连接Presto需要在项目中导入以下依赖:
-
<dependency>
-
<groupId>io.prestosql
</groupId>
-
<artifactId>presto-jdbc
</artifactId>
-
<version>312
</version>
-
</dependency>
JDBC连接代码如下:
-
public
class
ReadDataFromPresto {
-
public
static
void
main
(String[] args)
throws ClassNotFoundException, SQLException {
-
Connection
conn
= DriverManager.getConnection(
"jdbc:presto://node3:8080/mysql/presto_db",
"root",
null);
-
Statement
stmt
= conn.createStatement();
-
ResultSet
rs
= stmt.executeQuery(
"select pkg_name,sum(amount) as total_amount from machine_consume_detail group by pkg_name");
-
while (rs.next()) {
-
String
pkgName
= rs.getString(
"pkg_name");
-
double
totalAmount
= rs.getDouble(
"total_amount");
-
System.out.println(
"pkgName = "+pkgName+
",totalAmount="+totalAmount);
-
}
-
rs.close();
-
conn.close();
-
}
-
}
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
转载:https://blog.csdn.net/xiaoweite1/article/details/127827866
