1. 背景
背景是想通过最新的 jbdc 来使waterdrop 可以导入bitmap 类型的数据
2. 实施方法
a. 配置文件 adm_dmp_clickhouse_jdbc.conf
-
spark {
-
# Waterdrop defined streaming batch duration in seconds
-
spark.streaming.batchDuration = 5
-
spark.sql.catalogImplementation = "hive"
-
spark.app.name = "dmp#tranadm.adm_user_id_dist_pre"
-
spark.ui.port = 13000
-
spark.dynamicAllocation.enabled=false
-
spark.executor.instances = 9
-
spark.executor.cores = 4
-
spark.executor.memory = "28g"
-
spark.default.parallelism=72
-
spark.sql.shuffle.partitions=72
-
spark.dynamicAllocation.enabled=false
-
}
-
-
input {
-
hive {
-
pre_sql = "select 'code' as code , label_value as value , from_unixtime(unix_timestamp(dt,'yyyyMMdd'),'yyyy-MM-dd') as dt , cast('123456' AS LONG) , collect_list(cast(id as int)) as id_bitmap from tranadm.adm_audc_user_base_label_string_inner_pro where dt='20221228' and label_name='country' and label_value is not null group by label_value,dt"
-
result_table_name = "adm_dmp_user_id_dist"
-
}
-
}
-
-
filter {
-
}
-
-
output {
-
clickhousebitmap {
-
save_mode = "overwrite"
-
host = ""${clickhouse_urls}""
-
clickhouse.socket_timeout = 100000
-
database = ""${desc_ck_db}""
-
table = ""${desc_ck_table}""
-
fields = ["code","value","dt","version","id_bitmap"]
-
username = ""
-
password = ""
-
bulk_size = 5000000
-
}
-
}
b. 代码具体逻辑
collect_list 这个里面存放的是 spark 中arraylist [string] 类型的数据
-
/**
-
* 将数组转换成bitmap
-
*/
-
private
def getBitMapById(list_id: mutable.
WrappedArray[
Int]):
RoaringBitmap = {
-
if (list_id.length ==
0) {
-
new
RoaringBitmap()
-
}
else {
-
var startBitMap =
RoaringBitmap.bitmapOf(list_id(
0))
-
if (list_id.length >
1) {
-
for (i <-
1 to list_id.length -
1) {
-
startBitMap.add(list_id(i))
-
}
-
}
-
startBitMap
-
}
-
}
-
private
def renderBaseTypeStatement(
-
index:
Int,
-
fieldIndex:
Int,
-
fieldType:
String,
-
item:
Row,
-
statement:
PreparedStatement):
Unit = {
-
fieldType
match {
-
case
"DateTime" |
"Date" |
"String" =>
-
statement.setString(index +
1, item.getAs[
String](fieldIndex))
-
case
"Int8" |
"UInt8" |
"Int16" |
"UInt16" |
"Int32" =>
-
statement.setInt(index +
1, item.getAs[
Int](fieldIndex))
-
case
"UInt32" |
"UInt64" |
"Int64" =>
-
statement.setLong(index +
1, item.getAs[
Long](fieldIndex))
-
case
"Float32" => statement.setFloat(index +
1, item.getAs[
Float](fieldIndex))
-
case
"Float64" => statement.setDouble(index +
1, item.getAs[
Double](fieldIndex))
-
case
"Decimal" => statement.setBigDecimal(index +
1, item.getAs[
BigDecimal](fieldIndex))
-
case
"AggregateFunction(groupBitmap, UInt32)" =>
-
{
val value = item.getAs[mutable.
WrappedArray[
Int]](fieldIndex)
-
val bitmap = getBitMapById(value)
-
statement.setObject(index +
1,
ClickHouseBitmap.wrap(bitmap,
ClickHouseDataType.
UInt32))}
-
case _ => statement.setString(index +
1, item.getAs[
String](fieldIndex))
-
}
-
}
3. 遇到的问题
在新版本的jbdc 版本里面用到的是 0.9.10版本的 roaringbitmap
RoaringBitmap-0.9.10.jar
公司spark 集群是 2.4.6 其中 Roarbitmap 是低版本的;缺少高版本用到的方法。
在自己写jdbc spark 写ck 的时候用的是 spark 程序 config 的
--deploy-mode cluster \
--conf spark.executor.userClassPathFirst=true \
--conf spark.driver.userClassPathFirst=true \
--jar /home/xizhi.wu/waterdrop_1_5_1/waterdrop-1.5.1/lib/RoaringBitmap-0.9.10.jar
这样的方式来使spark 程序优先用 高版本的roaringbitmap。
但在sbt 项目中始终没法通过。
转载:https://blog.csdn.net/wuxizhi777/article/details/128605040
查看评论