小言_互联网的博客

Spark实时项目第六天-订单表与用户维度表联立

320人阅读  评论(0)

HBase建表

create table gmall_user_info  ( id varchar primary key ,  info.user_level varchar, info.birthday varchar,info.gender varchar,info.age_group varchar,info.gender_name varchar)SALT_BUCKETS = 3

UserInfo

在scala\com\atguigu\gmall\realtime\bean\UserInfo.scala

case class UserInfo(id:String , user_level:String ,birthday:String ,gender:String ,
                    var age_group:String,
                    var gender_name:String) {

}

UserInfoApp

import java.text.SimpleDateFormat
import java.util

import com.alibaba.fastjson.JSON
import com.atguigu.gmall.realtime.bean.UserInfo
import com.atguigu.gmall.realtime.utils.{MyKafkaUtil, OffsetManagerUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}


object UserInfoApp {

  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dim_user_info_app")

    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val topic = "ODS_T_USER_INFO";
    val groupId = "gmall_user_info_group"


    /////////////////////  偏移量处理///////////////////////////
    val offset: Map[TopicPartition, Long] = OffsetManagerUtil.getOffset(groupId, topic)

    var inputDstream: InputDStream[ConsumerRecord[String, String]] = null
    // 判断如果从redis中读取当前最新偏移量 则用该偏移量加载kafka中的数据  否则直接用kafka读出默认最新的数据
    if (offset != null && offset.size > 0) {
      inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, offset, groupId)
      //startInputDstream.map(_.value).print(1000)
    } else {
      inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, groupId)
    }

    //取得偏移量步长
    var offsetRanges: Array[OffsetRange] = null
    val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = inputDstream.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }

    val userInfoDstream: DStream[UserInfo] = inputGetOffsetDstream.map { record =>
      val userInfoJsonStr: String = record.value()
      val userInfo: UserInfo = JSON.parseObject(userInfoJsonStr, classOf[UserInfo])
      val formattor = new SimpleDateFormat("yyyy-MM-dd")
      val date: util.Date = formattor.parse(userInfo.birthday)
      val curTs: Long = System.currentTimeMillis()
      val  betweenMs= curTs-date.getTime//两个ts
    val age=betweenMs/1000L/60L/60L/24L/365L
      if(age<20){
        userInfo.age_group="20岁及以下"
      }else if(age>30){
        userInfo.age_group="30岁以上"
      }else{
        userInfo.age_group="21岁到30岁"
      }
      if(userInfo.gender=="M"){
        userInfo.gender_name="男"
      }else{
        userInfo.gender_name="女"
      }
      userInfo
    }

    userInfoDstream.foreachRDD{rdd=>
      import org.apache.phoenix.spark._
      rdd.saveToPhoenix("GMALL_USER_INFO",Seq("ID", "USER_LEVEL", "BIRTHDAY", "GENDER","AGE_GROUP","GENDER_NAME")
        ,new Configuration,Some("hadoop102,hadoop103,hadoop104:2181"))

      OffsetManagerUtil.saveOffset(groupId, topic, offsetRanges)
    }

    ssc.start()
    ssc.awaitTermination()

  }

}

利用maxwell-bootstrap 初始化数据

bin/maxwell-bootstrap --user maxwell  --password 123123 --host hadoop102  --database spark_gmall  --table user_info  --client_id maxwell_1

修改OrderInfoApp

在scala\com\atguigu\gmall\realtime\app\dw\OrderInfoApp.scala

import com.alibaba.fastjson.{JSON, JSONObject}
import com.atguigu.gmall.realtime.bean.{OrderInfo, UserState}
import com.atguigu.gmall.realtime.utils.{MyKafkaUtil, OffsetManagerUtil, PhoenixUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}

object OrderInfoApp {


  def main(args: Array[String]): Unit = {


    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dw_order_info_app")

    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val topic = "ODS_T_ORDER_INFO";
    val groupId = "base_order_info_group"


    /////////////////////  偏移量处理///////////////////////////
    val offset: Map[TopicPartition, Long] = OffsetManagerUtil.getOffset(groupId, topic)

    var inputDstream: InputDStream[ConsumerRecord[String, String]] = null
    // 判断如果从redis中读取当前最新偏移量 则用该偏移量加载kafka中的数据  否则直接用kafka读出默认最新的数据
    if (offset != null && offset.size > 0) {
      inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, offset, groupId)
      //startInputDstream.map(_.value).print(1000)
    } else {
      inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, groupId)
    }

    //取得偏移量步长
    var offsetRanges: Array[OffsetRange] = null
    val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = inputDstream.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }


    /////////////////////  业务处理///////////////////////////

    //基本转换 补充日期字段
    val orderInfoDstream: DStream[OrderInfo] = inputGetOffsetDstream.map { record =>
      val jsonString: String = record.value()
      val orderInfo: OrderInfo = JSON.parseObject(jsonString,classOf[OrderInfo])


        val datetimeArr: Array[String] = orderInfo.create_time.split(" ")
        //println(datetimeArr.toString + "---------------------------------------")
        orderInfo.create_date=datetimeArr(0)
        val timeArr: Array[String] = datetimeArr(1).split(":")
        orderInfo.create_hour=timeArr(0)

      orderInfo
    }

    // 使用mapPartitions 分区处理 减少sql每条数据都执行一次
    val orderInfoWithfirstDstream: DStream[OrderInfo] = orderInfoDstream.mapPartitions { orderInfoItr =>
      // 将订单信息转换程一个List
      val orderInfoList: List[OrderInfo] = orderInfoItr.toList

      if(orderInfoList.size>0){
        // 取出所有userID变成一个list
        val userIdList: List[String] = orderInfoList.map(_.user_id.toString)
        // println(userIdList.size + "--------------------")
        //  将所有得userID去执行这条SQL
        var sql = "select user_id,if_consumed from user_state where user_id in ('" + userIdList.mkString("','") + "')"
        // 查询出这批userID 得if_consumed (是否是首单) 的数据
        val userStateList: List[JSONObject] = PhoenixUtil.queryList(sql)

        // 以前的数据
        //避免2重循环  把一个 list转为map
        val userStateMap: Map[String, String] = userStateList.map(userStateJsonObj =>
          //注意返回字段的大小写!!!!!!!!
          // 将查询出来的数据进行数据结构转换程一个map
          (userStateJsonObj.getString("USER_ID"), userStateJsonObj.getString("IF_CONSUMED"))
        ).toMap

        // 现在数据和数据库已经存在的数据作对比
        // 遍历所有的订单信息
        for (orderInfo <- orderInfoList) {
          // 获取从数据库中查询的userID的if_consumed,不存在为null
          val userIfConsumed: String = userStateMap.getOrElse(orderInfo.user_id.toString, null)
          // 判断是否为空
          if (userIfConsumed != null && userIfConsumed == "1") {
            // 不为空变成0 ,就不是首单了
            orderInfo.if_first_order = "0"
          } else {
            // 为空就是首单
            orderInfo.if_first_order = "1"
          }
        }
      }
      orderInfoList.toIterator
    }

    // 解决 同一批次同一用户多次下单  如果是首次消费 ,多笔订单都会被认为是首单
    // 将orderInfo 转换成user_id,orderInfos
    val orderInfoWithUidDstream: DStream[(Long, OrderInfo)] = orderInfoWithfirstDstream.map(orderInfo=>(orderInfo.user_id,orderInfo))
    // 按照key进行分组聚合
    val orderInfoGroupbyUidDstream: DStream[(Long, Iterable[OrderInfo])] = orderInfoWithUidDstream.groupByKey()
    // 扁平化处理
    val orderInfoFinalFirstDstream: DStream[OrderInfo] = orderInfoGroupbyUidDstream.flatMap { case (userId, orderInfoItr) =>
      // 将orderInfo数据 转成List
      val orderInfoList: List[OrderInfo] = orderInfoItr.toList
      // 有首单标志的用户订单集合才进行处理
      if (orderInfoList(0).if_first_order == "1" && orderInfoList.size > 1) {
        // 把本批次用的订单进行排序
        val orderInfoSortedList: List[OrderInfo] = orderInfoList.sortWith { (orderInfo1, orderInfo2) =>
          (orderInfo1.create_time < orderInfo2.create_time)
        }
        // 循环处理除了第一笔全部置为0 (非首单)
        for (i <- 1 to orderInfoSortedList.size - 1) {
          orderInfoSortedList(i).if_first_order = "0"
        }
        orderInfoSortedList.toIterator
      } else {
        orderInfoList.toIterator
      }

    }


     ////////////////合并省份表/////////////////

    val orderInfoWithProvinceDstream: DStream[OrderInfo] = orderInfoFinalFirstDstream.transform {rdd =>

        // driver
        val sql = "select id,name,region_id,area_code from gmall_province_info"
        // 执行sql 获取一个provinceJsonObject得List集合
        val provinceJsonObject: List[JSONObject] = PhoenixUtil.queryList(sql)

        // list转成Map,将id作为Key,数据为value
        val provinceJsonObjMap: Map[Long, JSONObject] = provinceJsonObject.map {
          JsonObj =>
            (JsonObj.getLongValue("ID"), JsonObj)
        }.toMap

        // 定义广播变量
        val provinceJsonObjMapBc: Broadcast[Map[Long, JSONObject]] = ssc.sparkContext.broadcast(provinceJsonObjMap)

        // exceutor
        val orderInfoWithProvinceRDD: RDD[OrderInfo] = rdd.mapPartitions { orderInfoItr =>

            // 获取广播变量得值为一个List[JSONObject]
            val provinceJsonObjMap: Map[Long, JSONObject] = provinceJsonObjMapBc.value

            // 若是可迭代对象再for循环中 执行一次后,对象为空
            val orderInfoList: List[OrderInfo] = orderInfoItr.toList

            // 循环查找
            for (orderInfo <- orderInfoList) {
              // 通过查询到得map 对现有数据进行查询,没有则是null
              val provinceJsonObj: JSONObject = provinceJsonObjMap.getOrElse(orderInfo.province_id, null)
              // 判断为空
              if (provinceJsonObj != null) {
                // 提取信息
                orderInfo.province_name = provinceJsonObj.getString("NAME")
                println(orderInfo.province_name + "-----------")
                orderInfo.province_area_code = provinceJsonObj.getString("AREA_CODE")
              }
            }

            orderInfoList.toIterator
        }

        orderInfoWithProvinceRDD
    }


    /////////////// 合并 用户信息////////////////////
    val orderInfoWithUserDstream: DStream[OrderInfo] = orderInfoWithProvinceDstream.mapPartitions { orderInfoItr =>
      val orderList: List[OrderInfo] = orderInfoItr.toList
      if(orderList.size>0) {
        val userIdList: List[Long] = orderList.map(_.user_id)
        val sql = "select id ,user_level ,  birthday  , gender  , age_group  , gender_name from gmall_user_info where id in ('" + userIdList.mkString("','") + "')"
        val userJsonObjList: List[JSONObject] = PhoenixUtil.queryList(sql)
        val userJsonObjMap: Map[Long, JSONObject] = userJsonObjList.map(userJsonObj => (userJsonObj.getLongValue("ID"), userJsonObj)).toMap
        for (orderInfo <- orderList) {
          val userJsonObj: JSONObject = userJsonObjMap.getOrElse(orderInfo.user_id, null)
          orderInfo.user_age_group = userJsonObj.getString("AGE_GROUP")
          orderInfo.user_gender = userJsonObj.getString("GENDER_NAME")
        }
      }
      orderList.toIterator
    }

    orderInfoWithUserDstream.cache()

    orderInfoWithUserDstream.print(1000)


    orderInfoWithUserDstream.foreachRDD{rdd=>
      val userStatRDD:RDD[UserState]  = rdd.filter(_.if_first_order=="1").map(orderInfo=>
        UserState(orderInfo.user_id.toString,orderInfo.if_first_order)
      )
      import org.apache.phoenix.spark._
      userStatRDD.saveToPhoenix("user_state",
        Seq("USER_ID","IF_CONSUMED"),
        new Configuration,
        Some("hadoop102,hadoop103,hadoop104:2181"))

/*      //写入es
      //   println("订单数:"+ rdd.count())
      orderWithProvinceDstream.foreachRDD{rdd=>
        rdd.foreachPartition{orderInfoItr=>
          val orderList: List[OrderInfo] = orderInfoItr.toList
          val orderWithKeyList: List[(String, OrderInfo)] = orderList.map(orderInfo=>(orderInfo.id.toString,orderInfo))
          val dateStr: String = new SimpleDateFormat("yyyyMMdd").format(new Date)
          //  MyEsUtil.saveBulk(orderWithKeyList,"gmall1122_order_info-"+dateStr)

          for (orderInfo <- orderList ) {
            println(orderInfo)
            MyKafkaSink.send("DW_ORDER_INFO",orderInfo.id.toString,JSON.toJSONString(orderInfo,new SerializeConfig(true)))
          }

        }

        OffsetManagerUtil.saveOffset(groupId, topic, offsetRanges)*/
      OffsetManagerUtil.saveOffset(groupId, topic, offsetRanges)

    }

    /* dbJsonObjDstream.foreachRDD { rdd =>
       rdd.foreachPartition { jsonObjItr =>

         for (jsonObj <- jsonObjItr) {
           val dataObj: JSONObject = jsonObj.getJSONObject("data")
           val tableName = jsonObj.getString("table")
           val id = dataObj.getString("id")
           val topic = "ODS_T_" + tableName.toUpperCase
           MyKafkaSink.send(topic, id, dataObj.toJSONString)
         }
       }
       OffsetManager.saveOffset(groupId, topic, offsetRanges)

     }*/
    ssc.start()
    ssc.awaitTermination()

  }
}

整体目录结构

测试


转载:https://blog.csdn.net/qq_40180229/article/details/106219433
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场