Flink 支持 HiveCatalog 作为表元数据持久化的介质,在生产环境我们一般采用 HiveCatalog 来管理元数据, 这样的好处是不需要重复使用 DDL 创建表,只需要关心业务逻辑的 SQL,简化了开发的流程,可以节省很多时间,下面就来介绍一下怎么配置和使用 HiveCatalog.
sql-client-defaults.yaml 配置
-
catalogs:
-
- name: myhive
-
type: hive
-
hive-conf-dir: /home/jason/bigdata/hive/hive
-2.3
.4
-
default-database: mydatabase
添加依赖
-
-rw-r--r--.
1 root root
42998 Jul
22
2020 flink-connector-filesystem_2
.11
-1.11
.1.jar
-
-rw-r--r--.
1 root root
196416 Dec
11
17:
51 flink-connector-jdbc_2
.11
-1.12
.0.jar
-
-rw-r--r--.
1 root root
91553 Dec
2
17:
46 flink-csv
-1.12
.0.jar
-
-rw-r--r--.
1 root root
114120165 Dec
2
17:
50 flink-dist_2
.11
-1.12
.0.jar
-
-rw-r--r--.
1 root root
136663 Dec
2
17:
46 flink-json
-1.12
.0.jar
-
-rw-r--r--.
1 root root
43317025 Dec
11
12:
44 flink-shaded-hadoop
-2-uber
-2.8
.3
-10.0.jar
-
-rw-r--r--.
1 root root
7709741 Sep
30
01:
49 flink-shaded-zookeeper
-3.4
.14.jar
-
-rw-r--r--.
1 root root
3309441 Dec
12
15:
35 flink-sql-avro
-1.12
.0.jar
-
-rw-r--r--.
1 root root
40650306 Dec
19
12:
42 flink-sql-connector-hive
-2.3
.6_2
.11
-1.12
.0.jar
-
-rw-r--r--.
1 root root
3650212 Dec
11
14:
44 flink-sql-connector-kafka_2
.11
-1.12
.0.jar
-
-rw-r--r--.
1 root root
2124047 Dec
12
15:
35 flink-sql-orc_2
.11
-1.12
.0.jar
-
-rw-r--r--.
1 root root
5666201 Dec
12
15:
35 flink-sql-parquet_2
.11
-1.12
.0.jar
-
-rw-r--r--.
1 root root
36147819 Dec
2
17:
49 flink-table_2
.11
-1.12
.0.jar
-
-rw-r--r--.
1 root root
40286358 Dec
2
17:
49 flink-table-blink_2
.11
-1.12
.0.jar
-
-rw-r--r--.
1 root root
34214106 Dec
19
19:
18 hive-exec
-2.3
.4.jar
-
-rw-r--r--.
1 root root
67114 Feb
22
2020 log4j
-1.2-api
-2.12
.1.jar
-
-rw-r--r--.
1 root root
276771 Feb
22
2020 log4j-api
-2.12
.1.jar
-
-rw-r--r--.
1 root root
1674433 Feb
22
2020 log4j-core
-2.12
.1.jar
-
-rw-r--r--.
1 root root
23518 Feb
22
2020 log4j-slf4j-impl
-2.12
.1.jar
-
-rw-r--r--.
1 root root
1007502 Dec
11
17:
50 mysql-connector-java
-5.1
.47.jar
代码里面使用 HiveCatalog
-
package flink.ddl
-
-
import java.time.ZoneOffset._
-
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-
import org.apache.flink.table.api.EnvironmentSettings.newInstance
-
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment.create
-
import org.apache.flink.table.catalog.hive.HiveCatalog
-
import org.apache.flink.table.module.hive.HiveModule
-
import org.apache.hadoop.hive.ql.exec.UDF
-
import org.apache.hadoop.io.Text
-
-
/**
-
* Flink SQL 使用 hive catalog
-
*/
-
object FlinkDDLHiveCatalog {
-
-
private val catalog_name =
"myhive"
-
private val hive_version =
"2.3.4"
-
-
def main(args: Array[String]): Unit = {
-
val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-
val settings = newInstance()
-
.useBlinkPlanner()
-
.inStreamingMode()
-
.build()
-
-
val tEnv = create(env, settings)
-
tEnv.getConfig.setLocalTimeZone(ofHours(
8))
-
-
// 设置 early fired
-
tEnv.getConfig().getConfiguration().setBoolean(
"table.exec.emit.early-fire.enabled",
true)
-
tEnv.getConfig().getConfiguration().setString(
"table.exec.emit.early-fire.delay",
"5000 ms")
-
-
// 设置 job name
-
tEnv.getConfig.getConfiguration.setString(
"pipeline.name",this.getClass.getSimpleName.replace(
"$",
""))
-
-
val catalog =
new HiveCatalog(
-
catalog_name,
// catalog name
-
"mydatabase",
// default database
-
"/home/jason/bigdata/hive/hive-2.3.4",
// Hive config (hive-site.xml) directory
-
hive_version
// Hive version
-
)
-
-
// 注册 catalog
-
tEnv.registerCatalog(
"myhive", catalog)
-
// 选择一个 catalog
-
tEnv.useCatalog(
"myhive")
-
// 选择 database
-
tEnv.useDatabase(
"mydatabase")
-
// 加载 hive 的内置函数
-
tEnv.loadModule(catalog_name,
new HiveModule(hive_version))
-
-
// kafka_source_jason 和 print_table 提前已经创建好可以直接使用
-
tEnv.executeSql(
-
""
"
-
|insert into print_table
-
|select
-
|lower(funcName),
-
|MIN(`timestamp`) as min_timestamp,
-
|FIRST_VALUE(`timestamp`) as first_timestamp,
-
|MAX(`timestamp`) as max_timestamp
-
|from kafka_source_jason
-
|group by funcName
-
|"
"".stripMargin)
-
}
-
}
因为 kafka_source_jason 和 print_table 这两张表提前已经创建过了,已经保存在 HiveCatalog 里面,所以代码里面可以直接使用不用再次创建.
提交任务
在启动任务之前,需要先把 Hiv e的 metastore 起起来,因为 HiveCatalog 会和 metastore 连接这样才能访问元数据信息.
hive --service metastore &
-
flink run -d -m yarn-cluster \
-
-c flink.ddl.FlinkDDLHiveCatalog \
-
-yqu flink \
-
-nm FlinkDDLHiveCatalog \
-
-p
6 \
-
/home/jason/bigdata/jar/flink
-1.11
.1
-1.0-SNAPSHOT.jar
打印结果
上面的代码还加载了 hive 的内置函数, Flink SQL 里面可以直接使用 hive 的内置函数, SQL 中的 lower 就是 hive 的函数可以直接拿来使用,这样就会非常的方便.
推荐阅读
Flink SQL 解析嵌套的 JSON 如此简单
JasonLee,公众号:JasonLee的博客Flink SQL 解析嵌套的 JSON 数据
转载:https://blog.csdn.net/xianpanjia4616/article/details/113065431
查看评论