小言_互联网的博客

大数据Presto(四):Presto自定义函数和JDBC连接

870人阅读  评论(0)

文章目录

Presto自定义函数和JDBC连接

一、Presto 自定义函数

1、​​​​​​​UDF函数

2、​​​​​​​UDAF函数

二、Presto JDBC连接


Presto自定义函数和JDBC连接

​​​​​​​一、Presto 自定义函数

我们可以登录Presto客户端,使用命令:show functions 来查询对应的内置函数。我们也可以自己定义函数,自定义的函数包含UDF和UDAF函数。

1、​​​​​​​​​​​​​​UDF函数

自定义UDF函数及使用可以按照下面步骤来实现。

1.1、创建Maven项目,加入如下依赖


  
  1. <dependency>
  2. <groupId>com.facebook.presto </groupId>
  3. <artifactId>presto-spi </artifactId>
  4. <version>0.259 </version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.facebook.presto </groupId>
  8. <artifactId>presto-array </artifactId>
  9. <version>0.259 </version>
  10. </dependency>
  11. <dependency>
  12. <groupId>io.airlift </groupId>
  13. <artifactId>stats </artifactId>
  14. <version>0.163 </version>
  15. </dependency>
  16. <build>
  17. <plugins>
  18. <plugin>
  19. <artifactId>maven-assembly-plugin </artifactId>
  20. <version>2.4 </version>
  21. <configuration>
  22. <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
  23. <!--<appendAssemblyId>false</appendAssemblyId>-->
  24. <descriptorRefs>
  25. <descriptorRef>jar-with-dependencies </descriptorRef>
  26. </descriptorRefs>
  27. <archive>
  28. <manifest>
  29. <mainClass>com.lw.java.myflink.Streaming.example.FlinkReadSocketData </mainClass>
  30. </manifest>
  31. </archive>
  32. </configuration>
  33. <executions>
  34. <execution>
  35. <id>make-assembly </id>
  36. <phase>package </phase>
  37. <goals>
  38. <goal>assembly </goal>
  39. </goals>
  40. </execution>
  41. </executions>
  42. </plugin>
  43. </plugins>
  44. </build>

1.2、创建Presto注册插件类


  
  1. package com.lansonjy.prestocode;
  2. import com.facebook.presto.spi.Plugin;
  3. import com.google.common.collect.ImmutableSet;
  4. import java.util.Set;
  5. //Presto 注册自定义函数的类,此类需要继承Plugin接口
  6. public class MyFunctionsPlugin implements Plugin {
  7. @Override
  8. public Set<Class<?>> getFunctions()
  9. {
  10. return ImmutableSet.<Class<?>>builder()
  11. //注册UDF,这里填写对应的UDF类
  12. .add(MyUDF.class)
  13. .build();
  14. }
  15. }

1.3、创建“MyUDF”类,实现自定义UDF逻辑

这里自定义的UDF函数实现大写字母转换成小写字母。代码如下:


  
  1. package com.lansonjy.prestocode;
  2. import com.facebook.presto.spi.function.Description;
  3. import com.facebook.presto.spi.function.ScalarFunction;
  4. import com.facebook.presto.spi.function.SqlType;
  5. import com.facebook.presto.spi.type.StandardTypes;
  6. import io.airlift.slice.Slice;
  7. import io.airlift.slice.Slices;
  8. //自定义UDF函数
  9. public class MyUDF {
  10. //自定义UDF函数使用时的名称
  11. @ScalarFunction("myudf")
  12. //函数的描述
  13. @Description("转换字母大写为小写")
  14. //指定函数的返回类型,字符串类型必须返回Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
  15. @SqlType(StandardTypes.VARCHAR)
  16. public static Slice lowercase (@SqlType(StandardTypes.VARCHAR) Slice in)
  17. {
  18. String argument = in.toStringUtf8();
  19. return Slices.utf8Slice(argument.toLowerCase());
  20. }
  21. }

1.4、创建“resources”资源目录

在resouces资源目录中创建“META-INF/services”多级目录,在目录中创建“com.facebook.presto.spi.Plugin”配置文件,Presto将会根据此配置文件找到对应的注册自定义函数类。在此文件中需要指定注册自定义函数的类:

com.lansonjy.prestocode.MyFunctionsPlugin

1.5、将项目打包,上传到Presto集群

将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。

1.6、使用自定义UDF函数


  
  1. #登录Presto客户端
  2. ./presto --server node3:8080 --catalog mysql --schema presto_db
  3. #查询所有函数
  4. presto:presto_db> show functions;


  
  1. #使用这个函数查询转换数据
  2. presto:presto_db> select myudf( 'ABCDEF');
  3. _col0
  4. --------
  5. abcdef
  6. (1 row)

2、​​​​​​​UDAF函数

UDAF是自定义聚合函数,下面自定义一个UDAF实现计算平均数聚合函数功能,步骤如下:

2.1、在项目中创建“MyUDAF”类


  
  1. package com.lansonjy.prestocode;
  2. import com.facebook.presto.spi.block.BlockBuilder;
  3. import com.facebook.presto.spi.function.*;
  4. import com.facebook.presto.spi.type.DoubleType;
  5. import com.facebook.presto.spi.type.StandardTypes;
  6. //presto 自定义聚合函数实现-实现平均数计算
  7. //自定义聚合函数使用时的名称
  8. @AggregationFunction("myudaf")
  9. //自定义聚合函数注释
  10. @Description("我的自定义聚合函数,实现计算平均数")
  11. public class MyUDAF {
  12. //输入数据注释
  13. @InputFunction
  14. public static void input (LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value) {
  15. //针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。
  16. state.setLong(state.getLong() + 1);
  17. state.setDouble(state.getDouble() + value);
  18. }
  19. //聚合数据注释
  20. @CombineFunction
  21. public static void combine (LongAndDoubleState state, LongAndDoubleState otherState) {
  22. //将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。
  23. state.setLong(state.getLong() + otherState.getLong());
  24. state.setDouble(state.getDouble() + otherState.getDouble());
  25. }
  26. //输出数据注释
  27. @OutputFunction(StandardTypes.DOUBLE)
  28. public static void output (LongAndDoubleState state, BlockBuilder out) {
  29. //最终输出结果到一个 BlockBuilder。
  30. long count = state.getLong();
  31. if (count == 0) {
  32. out.appendNull();
  33. } else {
  34. double value = state.getDouble();
  35. DoubleType.DOUBLE.writeDouble(out, value / count);
  36. }
  37. }
  38. }

以上类中涉及到了自定义类型LongAndDoubelState接口实现,此接口继承了AccumulatorState接口,对于简单的计算逻辑,只是获取设置值,那么可以定义简单接口来实现,里面只需要实现对应的get,set方法实现即可。对于复杂的计算逻辑需要自定义类实现接口,实现复杂的计算逻辑,代码如下:


  
  1. package com.lansonjy.prestocode;
  2. import com.facebook.presto.spi.function.AccumulatorState;
  3. public interface LongAndDoubleState extends AccumulatorState {
  4. long getLong ();
  5. void setLong (long value);
  6. double getDouble ();
  7. void setDouble (double value);
  8. }

2.2、在“MyFunctionPlugin”中注册UDAF


  
  1. //Presto 注册自定义函数的类,此类需要继承Plugin接口
  2. public class MyFunctionsPlugin implements Plugin {
  3. @Override
  4. public Set<Class<?>> getFunctions()
  5. {
  6. return ImmutableSet.<Class<?>>builder()
  7. //注册UDF,这里填写对应的UDF类
  8. .add(MyUDF.class)
  9. //注册UDAF,这里填写对应的UDAF 类
  10. .add(MyUDAF.class)
  11. .build();
  12. }
  13. }

2.3、打包,上传到各个Presto

将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。

2.4、在presto中执行如下命令


  
  1. #登录Presto客户端
  2. [root@node3 presto-0.259] # ./presto --server node3:8080 --catalog mysql --schema presto_db
  3. #查看函数
  4. presto:presto_db> show functions;


  
  1. #执行聚合查询
  2. presto:presto_db> select pkg_name,myudaf(amount) as abc from machine_consume_detail group by pkg_name;

 

二、Presto JDBC连接

使用JDBC连接Presto需要在项目中导入以下依赖:


  
  1. <dependency>
  2. <groupId>io.prestosql </groupId>
  3. <artifactId>presto-jdbc </artifactId>
  4. <version>312 </version>
  5. </dependency>

JDBC连接代码如下:


  
  1. public class ReadDataFromPresto {
  2. public static void main (String[] args) throws ClassNotFoundException, SQLException {
  3. Connection conn = DriverManager.getConnection( "jdbc:presto://node3:8080/mysql/presto_db", "root", null);
  4. Statement stmt = conn.createStatement();
  5. ResultSet rs = stmt.executeQuery( "select pkg_name,sum(amount) as total_amount from machine_consume_detail group by pkg_name");
  6. while (rs.next()) {
  7. String pkgName = rs.getString( "pkg_name");
  8. double totalAmount = rs.getDouble( "total_amount");
  9. System.out.println( "pkgName = "+pkgName+ ",totalAmount="+totalAmount);
  10. }
  11. rs.close();
  12. conn.close();
  13. }
  14. }

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

转载:https://blog.csdn.net/xiaoweite1/article/details/127827866
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场