- Hive自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义 UDF来方便的扩展。
- 当 Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数。
1. 自定义函数种类
虽然hive中为我们提供了很多的内置函数,但是在实际工作中,有些情况下hive提供的内置函数无法满足我们的需求,就需要我们自己来手动编写,所以就有了自定义函数 UDF。
UDF分为三种,分别如下:
UDF(User-Defined-Function):
一进一出(输入一行,输出一行),
输入一行数据, 输出一行数据; 比如:upper()、lowser()等。
UDAF(User-Defined Aggregation Funcation)
,多进一出(输入多行,输出一行),
输入多行数据, 聚合成一行数据, 比如:avg()、sum()等。
UDTF(User-Defined Table-Generating Functions)
,一进多出(输入一行,输出多行),
比如:collect_set()、collect_list()等。
官方文档:https://cwiki.apache.org/confluence/display/Hive/HivePlugins
注意: 使用自定义函数需要引入hive-exec的依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.2.1</version>
</dependency>
2. 编写自定义UDF函数
UDF, 输入数据是一行数据, 输出数据也是一行数据,
在此, 我们编写UDF实现给定字符串, 返回字符串的长度, my_len();
2.1 编写UDF的步骤
- 继承
org.apache.hadoop.hive.ql.udf.generic.*GenericUDF*
- 之前的版本是继承UDF类, 这个类已经过时了.
-
从GenericUDF, 继承了三个方法, 进行重写
-
- , 设置在explain执行计划中显示的语句.
-
打包, 本地部署就放到本地任意目录, 非本地模式, 就放入到共享存储, 如HDFS上。
-
添加Jar包, 创建函数
-
- 在hive的命令行窗口创建函数
-
-
- 添加jar
-
-
-
-
add jar /路径
-
-
-
-
- 创建function
-
-
-
-
create temporary function 函数名 as "自定义udf类的全类名"
- 临时函数用于解决一些临时特殊的业务需求而开发的函数,hive中注册的临时函数只在当前会话可用,注册函数的时候用temporary关键字声名。
-
-
-
使用函数
-
- 在hive的命令行窗口使用函数;
-
-
-
删除函数
-
Drop [temporary] function [if exists] [dbname.]function_name;
2.2 参考代码
//import org.apache.hadoop.hive.ql.exec.UDF; 已经过期
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class GetLen extends GenericUDF {
@Override
//校验数据参数个数等初始化操作
public ObjectInspector initialize(ObjectInspector[] arguements) throws UDFArgumentException {
if(arguements.length != 1){
throw new UDFArgumentException("错误, 输入参数不为1 !");
}
//函数本身返回值为int,需要返回int类型的鉴别器对象, 记不住的话就去GenericUDF中找一个方法参考下写法即可
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
/**
*函数的逻辑处理
* @param arguements 函数的参数
* @return 返回值
* @throws HiveException
*/
@Override
public Object evaluate(DeferredObject[] arguements) throws HiveException {
//1. 取出输入的参数值, 转为string
String str = arguements[0].get().toString();
//返回长度, 为了避免null 的string还要进行一个判断
return str == null ? 0 : str.length();
}
@Override
public String getDisplayString(String[] strings) {
return null;
}
}
3. 编写自定义UDTF函数
UDTF, 输入一行数据, 输出多行数据
在此, 我们实现一个字符串切割函数, 给定一行字符串, 按照指定的分隔符进行切分, 并输出。
3.1 编写UDTF的步骤
-
继承
org.apache.hadoop.hive.ql.udf.generic.*GenericUDTF*
-
- 之前的版本是继承UDTF类, 这个类已经过时了.
-
从GenericUDTF, 继承了三个方法, 进行重写
-
-
打包, 本地部署就放到本地任意目录, 非本地模式, 就放入到共享存储, 如HDFS上。
-
添加Jar包, 创建函数
-
- 在hive的命令行窗口创建函数
-
-
- 添加jar
-
-
-
-
add jar /路径
-
-
-
-
- 创建function
-
-
-
-
create temporary function 函数名 as "自定义udf类的全类名"
- 临时函数用于解决一些临时特殊的业务需求而开发的函数,hive中注册的临时函数只在当前会话可用,注册函数的时候用temporary关键字声名。
-
-
-
使用函数
-
- 在hive的命令行窗口使用函数;
-
-
-
删除函数
-
Drop [temporary] function [if exists] [dbname.]function_name;
3.2 参考代码
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
public class SplitString extends GenericUDTF {
//全局变量, 用于存储要写出个每一个单词
private List<String> outputList = null;
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//SQL 查询输出数据的默认列名
List<String> fieldsName = new ArrayList<>();
fieldsName.add("word");
//输出数据的类型
List<ObjectInspector> fieldsOIs = new ArrayList<>();
fieldsOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
//最终返回值
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldsName, fieldsOIs);
}
//处理输入数据的方法
@Override
public void process(Object[] arguments) throws HiveException {
//1. 获取第一个参数, 也就是待分割字符串
String targetString = arguments[0].toString();
//2. 获取第二个参数, 也就是切分字符
String splitKey = arguments[1].toString();
//3. 执行分割
String[] strings = targetString.split(splitKey);
//一进多出, 就像是使用map或者reduce过程的写出 context.write(key,value)
// 只不过hive中是forward方法
for(String str : strings){
//要输出的是一个个的单词, 每次遍历都把这个单词放入list
//然后使用forward方法写出
outputList.clear();
outputList.add(str);
forward(outputList);
}
}
//收尾方法
@Override
public void close() throws HiveException {
}
}
补充文章:
https://blog.csdn.net/HG_Harvey/article/details/77688735
https://cloud.tencent.com/developer/article/1733568
转载:https://blog.csdn.net/nmsLLCSDN/article/details/125833600