小言_互联网的博客

waterdrop(1.5.1版本)增加bitmap类型导数的遇到的问题

392人阅读  评论(0)

1. 背景

背景是想通过最新的 jbdc 来使waterdrop 可以导入bitmap 类型的数据

2. 实施方法

a. 配置文件 adm_dmp_clickhouse_jdbc.conf


   
  1. spark {
  2. # Waterdrop defined streaming batch duration in seconds
  3. spark.streaming.batchDuration = 5
  4. spark.sql.catalogImplementation = "hive"
  5. spark.app.name = "dmp#tranadm.adm_user_id_dist_pre"
  6. spark.ui.port = 13000
  7. spark.dynamicAllocation.enabled=false
  8. spark.executor.instances = 9
  9. spark.executor.cores = 4
  10. spark.executor.memory = "28g"
  11. spark.default.parallelism=72
  12. spark.sql.shuffle.partitions=72
  13. spark.dynamicAllocation.enabled=false
  14. }
  15. input {
  16. hive {
  17. pre_sql = "select 'code' as code , label_value as value , from_unixtime(unix_timestamp(dt,'yyyyMMdd'),'yyyy-MM-dd') as dt , cast('123456' AS LONG) , collect_list(cast(id as int)) as id_bitmap from tranadm.adm_audc_user_base_label_string_inner_pro where dt='20221228' and label_name='country' and label_value is not null group by label_value,dt"
  18. result_table_name = "adm_dmp_user_id_dist"
  19. }
  20. }
  21. filter {
  22. }
  23. output {
  24. clickhousebitmap {
  25. save_mode = "overwrite"
  26. host = ""${clickhouse_urls}""
  27. clickhouse.socket_timeout = 100000
  28. database = ""${desc_ck_db}""
  29. table = ""${desc_ck_table}""
  30. fields = ["code","value","dt","version","id_bitmap"]
  31. username = ""
  32. password = ""
  33. bulk_size = 5000000
  34. }
  35. }

b. 代码具体逻辑

collect_list 这个里面存放的是 spark 中arraylist [string] 类型的数据


   
  1. /**
  2. * 将数组转换成bitmap
  3. */
  4. private def getBitMapById(list_id: mutable. WrappedArray[ Int]): RoaringBitmap = {
  5. if (list_id.length == 0) {
  6. new RoaringBitmap()
  7. } else {
  8. var startBitMap = RoaringBitmap.bitmapOf(list_id( 0))
  9. if (list_id.length > 1) {
  10. for (i <- 1 to list_id.length - 1) {
  11. startBitMap.add(list_id(i))
  12. }
  13. }
  14. startBitMap
  15. }
  16. }

   
  1. private def renderBaseTypeStatement(
  2. index: Int,
  3. fieldIndex: Int,
  4. fieldType: String,
  5. item: Row,
  6. statement: PreparedStatement): Unit = {
  7. fieldType match {
  8. case "DateTime" | "Date" | "String" =>
  9. statement.setString(index + 1, item.getAs[ String](fieldIndex))
  10. case "Int8" | "UInt8" | "Int16" | "UInt16" | "Int32" =>
  11. statement.setInt(index + 1, item.getAs[ Int](fieldIndex))
  12. case "UInt32" | "UInt64" | "Int64" =>
  13. statement.setLong(index + 1, item.getAs[ Long](fieldIndex))
  14. case "Float32" => statement.setFloat(index + 1, item.getAs[ Float](fieldIndex))
  15. case "Float64" => statement.setDouble(index + 1, item.getAs[ Double](fieldIndex))
  16. case "Decimal" => statement.setBigDecimal(index + 1, item.getAs[ BigDecimal](fieldIndex))
  17. case "AggregateFunction(groupBitmap, UInt32)" =>
  18. { val value = item.getAs[mutable. WrappedArray[ Int]](fieldIndex)
  19. val bitmap = getBitMapById(value)
  20. statement.setObject(index + 1, ClickHouseBitmap.wrap(bitmap, ClickHouseDataType. UInt32))}
  21. case _ => statement.setString(index + 1, item.getAs[ String](fieldIndex))
  22. }
  23. }

3. 遇到的问题

在新版本的jbdc 版本里面用到的是 0.9.10版本的 roaringbitmap

RoaringBitmap-0.9.10.jar

公司spark 集群是 2.4.6 其中 Roarbitmap 是低版本的;缺少高版本用到的方法。

在自己写jdbc spark 写ck 的时候用的是 spark 程序 config 的

--deploy-mode cluster \

--conf spark.executor.userClassPathFirst=true \

--conf spark.driver.userClassPathFirst=true \

--jar /home/xizhi.wu/waterdrop_1_5_1/waterdrop-1.5.1/lib/RoaringBitmap-0.9.10.jar

这样的方式来使spark 程序优先用 高版本的roaringbitmap。

但在sbt 项目中始终没法通过。


转载:https://blog.csdn.net/wuxizhi777/article/details/128605040
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场