小言_互联网的博客

Flink SQL 结合 HiveCatalog 使用

450人阅读  评论(0)

Flink 支持 HiveCatalog 作为表元数据持久化的介质,在生产环境我们一般采用 HiveCatalog 来管理元数据, 这样的好处是不需要重复使用 DDL 创建表,只需要关心业务逻辑的 SQL,简化了开发的流程,可以节省很多时间,下面就来介绍一下怎么配置和使用 HiveCatalog.

sql-client-defaults.yaml 配置


   
  1. catalogs:
  2.    - name: myhive
  3.       type: hive
  4.      hive-conf-dir: /home/jason/bigdata/hive/hive -2.3 .4
  5.       default-database: mydatabase

添加依赖


   
  1. -rw-r--r--.  1 root root      42998 Jul  22   2020 flink-connector-filesystem_2 .11 -1.11 .1.jar
  2. -rw-r--r--.  1 root root     196416 Dec  11  17: 51 flink-connector-jdbc_2 .11 -1.12 .0.jar
  3. -rw-r--r--.  1 root root      91553 Dec   2  17: 46 flink-csv -1.12 .0.jar
  4. -rw-r--r--.  1 root root  114120165 Dec   2  17: 50 flink-dist_2 .11 -1.12 .0.jar
  5. -rw-r--r--.  1 root root     136663 Dec   2  17: 46 flink-json -1.12 .0.jar
  6. -rw-r--r--.  1 root root   43317025 Dec  11  12: 44 flink-shaded-hadoop -2-uber -2.8 .3 -10.0.jar
  7. -rw-r--r--.  1 root root    7709741 Sep  30  01: 49 flink-shaded-zookeeper -3.4 .14.jar
  8. -rw-r--r--.  1 root root    3309441 Dec  12  15: 35 flink-sql-avro -1.12 .0.jar
  9. -rw-r--r--.  1 root root   40650306 Dec  19  12: 42 flink-sql-connector-hive -2.3 .6_2 .11 -1.12 .0.jar
  10. -rw-r--r--.  1 root root    3650212 Dec  11  14: 44 flink-sql-connector-kafka_2 .11 -1.12 .0.jar
  11. -rw-r--r--.  1 root root    2124047 Dec  12  15: 35 flink-sql-orc_2 .11 -1.12 .0.jar
  12. -rw-r--r--.  1 root root    5666201 Dec  12  15: 35 flink-sql-parquet_2 .11 -1.12 .0.jar
  13. -rw-r--r--.  1 root root   36147819 Dec   2  17: 49 flink-table_2 .11 -1.12 .0.jar
  14. -rw-r--r--.  1 root root   40286358 Dec   2  17: 49 flink-table-blink_2 .11 -1.12 .0.jar
  15. -rw-r--r--.  1 root root   34214106 Dec  19  19: 18 hive-exec -2.3 .4.jar
  16. -rw-r--r--.  1 root root      67114 Feb  22   2020 log4j -1.2-api -2.12 .1.jar
  17. -rw-r--r--.  1 root root     276771 Feb  22   2020 log4j-api -2.12 .1.jar
  18. -rw-r--r--.  1 root root    1674433 Feb  22   2020 log4j-core -2.12 .1.jar
  19. -rw-r--r--.  1 root root      23518 Feb  22   2020 log4j-slf4j-impl -2.12 .1.jar
  20. -rw-r--r--.  1 root root    1007502 Dec  11  17: 50 mysql-connector-java -5.1 .47.jar

代码里面使用 HiveCatalog


   
  1. package flink.ddl
  2. import java.time.ZoneOffset._
  3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  4. import org.apache.flink.table.api.EnvironmentSettings.newInstance
  5. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment.create
  6. import org.apache.flink.table.catalog.hive.HiveCatalog
  7. import org.apache.flink.table.module.hive.HiveModule
  8. import org.apache.hadoop.hive.ql.exec.UDF
  9. import org.apache.hadoop.io.Text
  10. /**
  11.  * Flink SQL 使用 hive catalog
  12.  */
  13. object FlinkDDLHiveCatalog {
  14.     private val catalog_name =  "myhive"
  15.     private val hive_version =  "2.3.4"
  16.     def main(args: Array[String]): Unit = {
  17.         val env = StreamExecutionEnvironment.getExecutionEnvironment
  18.         val settings = newInstance()
  19.                 .useBlinkPlanner()
  20.                 .inStreamingMode()
  21.                 .build()
  22.         val tEnv = create(env, settings)
  23.         tEnv.getConfig.setLocalTimeZone(ofHours( 8))
  24.          // 设置 early fired
  25.         tEnv.getConfig().getConfiguration().setBoolean( "table.exec.emit.early-fire.enabled"true)
  26.         tEnv.getConfig().getConfiguration().setString( "table.exec.emit.early-fire.delay""5000 ms")
  27.          // 设置 job name
  28.         tEnv.getConfig.getConfiguration.setString( "pipeline.name",this.getClass.getSimpleName.replace( "$", ""))
  29.         val catalog =  new HiveCatalog(
  30.             catalog_name,                    // catalog name
  31.              "mydatabase",                 // default database
  32.              "/home/jason/bigdata/hive/hive-2.3.4",   // Hive config (hive-site.xml) directory
  33.             hive_version                    // Hive version
  34.         )
  35.          // 注册 catalog
  36.         tEnv.registerCatalog( "myhive", catalog)
  37.          // 选择一个 catalog
  38.         tEnv.useCatalog( "myhive")
  39.          // 选择 database
  40.         tEnv.useDatabase( "mydatabase")
  41.          // 加载 hive 的内置函数
  42.         tEnv.loadModule(catalog_name, new HiveModule(hive_version))
  43.          // kafka_source_jason 和 print_table 提前已经创建好可以直接使用
  44.         tEnv.executeSql(
  45.              "" "
  46.               |insert into print_table
  47.               |select
  48.               |lower(funcName),
  49.               |MIN(`timestamp`) as min_timestamp,
  50.               |FIRST_VALUE(`timestamp`) as first_timestamp,
  51.               |MAX(`timestamp`) as max_timestamp
  52.               |from kafka_source_jason
  53.               |group by funcName
  54.               |" "".stripMargin)
  55.     }
  56. }

因为 kafka_source_jason 和 print_table 这两张表提前已经创建过了,已经保存在 HiveCatalog 里面,所以代码里面可以直接使用不用再次创建.

提交任务

在启动任务之前,需要先把 Hiv e的 metastore 起起来,因为 HiveCatalog 会和 metastore 连接这样才能访问元数据信息.

hive --service metastore &

   
  1. flink run -d -m yarn-cluster \
  2. -c flink.ddl.FlinkDDLHiveCatalog \
  3. -yqu flink \
  4. -nm FlinkDDLHiveCatalog \
  5. -p  6 \
  6. /home/jason/bigdata/jar/flink -1.11 .1 -1.0-SNAPSHOT.jar 

打印结果

上面的代码还加载了 hive 的内置函数, Flink SQL 里面可以直接使用 hive 的内置函数, SQL 中的 lower 就是 hive 的函数可以直接拿来使用,这样就会非常的方便.

推荐阅读

Flink SQL 解析嵌套的 JSON 如此简单

JasonLee,公众号:JasonLee的博客Flink SQL 解析嵌套的 JSON 数据


转载:https://blog.csdn.net/xianpanjia4616/article/details/113065431
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场