HBase建表
create table gmall_user_info ( id varchar primary key , info.user_level varchar, info.birthday varchar,info.gender varchar,info.age_group varchar,info.gender_name varchar)SALT_BUCKETS = 3
UserInfo
在scala\com\atguigu\gmall\realtime\bean\UserInfo.scala
case class UserInfo(id:String , user_level:String ,birthday:String ,gender:String ,
var age_group:String,
var gender_name:String) {
}
UserInfoApp
import java.text.SimpleDateFormat
import java.util
import com.alibaba.fastjson.JSON
import com.atguigu.gmall.realtime.bean.UserInfo
import com.atguigu.gmall.realtime.utils.{MyKafkaUtil, OffsetManagerUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
object UserInfoApp {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dim_user_info_app")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topic = "ODS_T_USER_INFO";
val groupId = "gmall_user_info_group"
///////////////////// 偏移量处理///////////////////////////
val offset: Map[TopicPartition, Long] = OffsetManagerUtil.getOffset(groupId, topic)
var inputDstream: InputDStream[ConsumerRecord[String, String]] = null
// 判断如果从redis中读取当前最新偏移量 则用该偏移量加载kafka中的数据 否则直接用kafka读出默认最新的数据
if (offset != null && offset.size > 0) {
inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, offset, groupId)
//startInputDstream.map(_.value).print(1000)
} else {
inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, groupId)
}
//取得偏移量步长
var offsetRanges: Array[OffsetRange] = null
val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = inputDstream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
val userInfoDstream: DStream[UserInfo] = inputGetOffsetDstream.map { record =>
val userInfoJsonStr: String = record.value()
val userInfo: UserInfo = JSON.parseObject(userInfoJsonStr, classOf[UserInfo])
val formattor = new SimpleDateFormat("yyyy-MM-dd")
val date: util.Date = formattor.parse(userInfo.birthday)
val curTs: Long = System.currentTimeMillis()
val betweenMs= curTs-date.getTime//两个ts
val age=betweenMs/1000L/60L/60L/24L/365L
if(age<20){
userInfo.age_group="20岁及以下"
}else if(age>30){
userInfo.age_group="30岁以上"
}else{
userInfo.age_group="21岁到30岁"
}
if(userInfo.gender=="M"){
userInfo.gender_name="男"
}else{
userInfo.gender_name="女"
}
userInfo
}
userInfoDstream.foreachRDD{rdd=>
import org.apache.phoenix.spark._
rdd.saveToPhoenix("GMALL_USER_INFO",Seq("ID", "USER_LEVEL", "BIRTHDAY", "GENDER","AGE_GROUP","GENDER_NAME")
,new Configuration,Some("hadoop102,hadoop103,hadoop104:2181"))
OffsetManagerUtil.saveOffset(groupId, topic, offsetRanges)
}
ssc.start()
ssc.awaitTermination()
}
}
利用maxwell-bootstrap 初始化数据
bin/maxwell-bootstrap --user maxwell --password 123123 --host hadoop102 --database spark_gmall --table user_info --client_id maxwell_1
修改OrderInfoApp
在scala\com\atguigu\gmall\realtime\app\dw\OrderInfoApp.scala
import com.alibaba.fastjson.{JSON, JSONObject}
import com.atguigu.gmall.realtime.bean.{OrderInfo, UserState}
import com.atguigu.gmall.realtime.utils.{MyKafkaUtil, OffsetManagerUtil, PhoenixUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
object OrderInfoApp {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dw_order_info_app")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topic = "ODS_T_ORDER_INFO";
val groupId = "base_order_info_group"
///////////////////// 偏移量处理///////////////////////////
val offset: Map[TopicPartition, Long] = OffsetManagerUtil.getOffset(groupId, topic)
var inputDstream: InputDStream[ConsumerRecord[String, String]] = null
// 判断如果从redis中读取当前最新偏移量 则用该偏移量加载kafka中的数据 否则直接用kafka读出默认最新的数据
if (offset != null && offset.size > 0) {
inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, offset, groupId)
//startInputDstream.map(_.value).print(1000)
} else {
inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, groupId)
}
//取得偏移量步长
var offsetRanges: Array[OffsetRange] = null
val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = inputDstream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
///////////////////// 业务处理///////////////////////////
//基本转换 补充日期字段
val orderInfoDstream: DStream[OrderInfo] = inputGetOffsetDstream.map { record =>
val jsonString: String = record.value()
val orderInfo: OrderInfo = JSON.parseObject(jsonString,classOf[OrderInfo])
val datetimeArr: Array[String] = orderInfo.create_time.split(" ")
//println(datetimeArr.toString + "---------------------------------------")
orderInfo.create_date=datetimeArr(0)
val timeArr: Array[String] = datetimeArr(1).split(":")
orderInfo.create_hour=timeArr(0)
orderInfo
}
// 使用mapPartitions 分区处理 减少sql每条数据都执行一次
val orderInfoWithfirstDstream: DStream[OrderInfo] = orderInfoDstream.mapPartitions { orderInfoItr =>
// 将订单信息转换程一个List
val orderInfoList: List[OrderInfo] = orderInfoItr.toList
if(orderInfoList.size>0){
// 取出所有userID变成一个list
val userIdList: List[String] = orderInfoList.map(_.user_id.toString)
// println(userIdList.size + "--------------------")
// 将所有得userID去执行这条SQL
var sql = "select user_id,if_consumed from user_state where user_id in ('" + userIdList.mkString("','") + "')"
// 查询出这批userID 得if_consumed (是否是首单) 的数据
val userStateList: List[JSONObject] = PhoenixUtil.queryList(sql)
// 以前的数据
//避免2重循环 把一个 list转为map
val userStateMap: Map[String, String] = userStateList.map(userStateJsonObj =>
//注意返回字段的大小写!!!!!!!!
// 将查询出来的数据进行数据结构转换程一个map
(userStateJsonObj.getString("USER_ID"), userStateJsonObj.getString("IF_CONSUMED"))
).toMap
// 现在数据和数据库已经存在的数据作对比
// 遍历所有的订单信息
for (orderInfo <- orderInfoList) {
// 获取从数据库中查询的userID的if_consumed,不存在为null
val userIfConsumed: String = userStateMap.getOrElse(orderInfo.user_id.toString, null)
// 判断是否为空
if (userIfConsumed != null && userIfConsumed == "1") {
// 不为空变成0 ,就不是首单了
orderInfo.if_first_order = "0"
} else {
// 为空就是首单
orderInfo.if_first_order = "1"
}
}
}
orderInfoList.toIterator
}
// 解决 同一批次同一用户多次下单 如果是首次消费 ,多笔订单都会被认为是首单
// 将orderInfo 转换成user_id,orderInfos
val orderInfoWithUidDstream: DStream[(Long, OrderInfo)] = orderInfoWithfirstDstream.map(orderInfo=>(orderInfo.user_id,orderInfo))
// 按照key进行分组聚合
val orderInfoGroupbyUidDstream: DStream[(Long, Iterable[OrderInfo])] = orderInfoWithUidDstream.groupByKey()
// 扁平化处理
val orderInfoFinalFirstDstream: DStream[OrderInfo] = orderInfoGroupbyUidDstream.flatMap { case (userId, orderInfoItr) =>
// 将orderInfo数据 转成List
val orderInfoList: List[OrderInfo] = orderInfoItr.toList
// 有首单标志的用户订单集合才进行处理
if (orderInfoList(0).if_first_order == "1" && orderInfoList.size > 1) {
// 把本批次用的订单进行排序
val orderInfoSortedList: List[OrderInfo] = orderInfoList.sortWith { (orderInfo1, orderInfo2) =>
(orderInfo1.create_time < orderInfo2.create_time)
}
// 循环处理除了第一笔全部置为0 (非首单)
for (i <- 1 to orderInfoSortedList.size - 1) {
orderInfoSortedList(i).if_first_order = "0"
}
orderInfoSortedList.toIterator
} else {
orderInfoList.toIterator
}
}
////////////////合并省份表/////////////////
val orderInfoWithProvinceDstream: DStream[OrderInfo] = orderInfoFinalFirstDstream.transform {rdd =>
// driver
val sql = "select id,name,region_id,area_code from gmall_province_info"
// 执行sql 获取一个provinceJsonObject得List集合
val provinceJsonObject: List[JSONObject] = PhoenixUtil.queryList(sql)
// list转成Map,将id作为Key,数据为value
val provinceJsonObjMap: Map[Long, JSONObject] = provinceJsonObject.map {
JsonObj =>
(JsonObj.getLongValue("ID"), JsonObj)
}.toMap
// 定义广播变量
val provinceJsonObjMapBc: Broadcast[Map[Long, JSONObject]] = ssc.sparkContext.broadcast(provinceJsonObjMap)
// exceutor
val orderInfoWithProvinceRDD: RDD[OrderInfo] = rdd.mapPartitions { orderInfoItr =>
// 获取广播变量得值为一个List[JSONObject]
val provinceJsonObjMap: Map[Long, JSONObject] = provinceJsonObjMapBc.value
// 若是可迭代对象再for循环中 执行一次后,对象为空
val orderInfoList: List[OrderInfo] = orderInfoItr.toList
// 循环查找
for (orderInfo <- orderInfoList) {
// 通过查询到得map 对现有数据进行查询,没有则是null
val provinceJsonObj: JSONObject = provinceJsonObjMap.getOrElse(orderInfo.province_id, null)
// 判断为空
if (provinceJsonObj != null) {
// 提取信息
orderInfo.province_name = provinceJsonObj.getString("NAME")
println(orderInfo.province_name + "-----------")
orderInfo.province_area_code = provinceJsonObj.getString("AREA_CODE")
}
}
orderInfoList.toIterator
}
orderInfoWithProvinceRDD
}
/////////////// 合并 用户信息////////////////////
val orderInfoWithUserDstream: DStream[OrderInfo] = orderInfoWithProvinceDstream.mapPartitions { orderInfoItr =>
val orderList: List[OrderInfo] = orderInfoItr.toList
if(orderList.size>0) {
val userIdList: List[Long] = orderList.map(_.user_id)
val sql = "select id ,user_level , birthday , gender , age_group , gender_name from gmall_user_info where id in ('" + userIdList.mkString("','") + "')"
val userJsonObjList: List[JSONObject] = PhoenixUtil.queryList(sql)
val userJsonObjMap: Map[Long, JSONObject] = userJsonObjList.map(userJsonObj => (userJsonObj.getLongValue("ID"), userJsonObj)).toMap
for (orderInfo <- orderList) {
val userJsonObj: JSONObject = userJsonObjMap.getOrElse(orderInfo.user_id, null)
orderInfo.user_age_group = userJsonObj.getString("AGE_GROUP")
orderInfo.user_gender = userJsonObj.getString("GENDER_NAME")
}
}
orderList.toIterator
}
orderInfoWithUserDstream.cache()
orderInfoWithUserDstream.print(1000)
orderInfoWithUserDstream.foreachRDD{rdd=>
val userStatRDD:RDD[UserState] = rdd.filter(_.if_first_order=="1").map(orderInfo=>
UserState(orderInfo.user_id.toString,orderInfo.if_first_order)
)
import org.apache.phoenix.spark._
userStatRDD.saveToPhoenix("user_state",
Seq("USER_ID","IF_CONSUMED"),
new Configuration,
Some("hadoop102,hadoop103,hadoop104:2181"))
/* //写入es
// println("订单数:"+ rdd.count())
orderWithProvinceDstream.foreachRDD{rdd=>
rdd.foreachPartition{orderInfoItr=>
val orderList: List[OrderInfo] = orderInfoItr.toList
val orderWithKeyList: List[(String, OrderInfo)] = orderList.map(orderInfo=>(orderInfo.id.toString,orderInfo))
val dateStr: String = new SimpleDateFormat("yyyyMMdd").format(new Date)
// MyEsUtil.saveBulk(orderWithKeyList,"gmall1122_order_info-"+dateStr)
for (orderInfo <- orderList ) {
println(orderInfo)
MyKafkaSink.send("DW_ORDER_INFO",orderInfo.id.toString,JSON.toJSONString(orderInfo,new SerializeConfig(true)))
}
}
OffsetManagerUtil.saveOffset(groupId, topic, offsetRanges)*/
OffsetManagerUtil.saveOffset(groupId, topic, offsetRanges)
}
/* dbJsonObjDstream.foreachRDD { rdd =>
rdd.foreachPartition { jsonObjItr =>
for (jsonObj <- jsonObjItr) {
val dataObj: JSONObject = jsonObj.getJSONObject("data")
val tableName = jsonObj.getString("table")
val id = dataObj.getString("id")
val topic = "ODS_T_" + tableName.toUpperCase
MyKafkaSink.send(topic, id, dataObj.toJSONString)
}
}
OffsetManager.saveOffset(groupId, topic, offsetRanges)
}*/
ssc.start()
ssc.awaitTermination()
}
}
整体目录结构
测试
转载:https://blog.csdn.net/qq_40180229/article/details/106219433
查看评论