小言_互联网的博客

实时数据仓库-从0到1实时数据仓库设计&实现(SparkStreaming3.x)

417人阅读  评论(0)

简介

      从数据库的设计,到前端,后端,实时数据仓库一套打通实时数据仓库设计与实现,这个项目的特点就是,麻雀虽小,五脏俱全,一般的实时数据仓库复制粘贴里面的代码就能够实现功能。

数据流程图

需求

利用学生的好强心理,促进学生的好学的习惯,设计这一套全国各地区毕业学生成绩贡献总分数统计,最后按各地区学生所得总分排名(学生提交每一课最优的成绩)

代码

前期web应用开发

springboot,mybatis-plus,mysql,html,js

表设计

资源地址

链接:https://pan.baidu.com/s/1bWf6rEPMBKvY3wZbsNLtUA 
提取码:yyds 


 

先启动springboot然后才能得到html里面的数据

效果图

数据流程,点击  提交->springboot->mysql

查询保存数据的语句


  
  1. select stu_name,score,area_name,course_name from tb_stu
  2. LEFT JOIN tb_area
  3. ON tb_stu.area_code =tb_area.area_code
  4. LEFT JOIN tb_stu_course ON
  5. tb_stu.id = tb_stu_course.stu_id
  6. LEFT JOIN tb_course
  7. ON tb_stu_course.course_id =tb_course.id
  8. LEFT JOIN tb_tearch
  9. ON tb_course.teacher_id =tb_tearch.id

数据仓库建设

数据采集

前提安装了kafka

在我kafka的专栏里面有

开启mysql的binlog

vi /etc/my.cnf

  
  1. [mysqld]
  2. server- id = 1
  3. log- bin=mysql- bin
  4. binlog_format=row
  5. binlog-do-db=school

使用maxwell采集mysql

tar -zxvf maxwell-1.25.0.tar.gz

创建位置保存的数据库


  
  1. mysql -uroot -p123456
  2. CREATE DATABASE schoolmaxwell;
  3. #maxwell是用戶名
  4. GRANT ALL ON schoolmaxwell. * TO 'maxwell'@ '%' IDENTIFIED BY '123456';
  5. #maxwell是用戶名
  6. GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *. * TO maxwell@ '%';

修改配置

cp config.properties.example

  
  1. producer =kafka
  2. kafka.bootstrap.servers =master : 9092 ,node1 : 9092 ,node2 : 9092
  3. #需要添加
  4. kafka_topic =school_db
  5. #mysql
  6. #数据库host
  7. host =master
  8. #数据库用户
  9. user =maxwell
  10. #数据库密码
  11. password = 123456
  12. #需要添加 后续bootstrap初始化会用
  13. client_id =maxwell_1
  14. #指定消费位置保存的数据库
  15. schema_database =shishimaxwell

测试是否可用

初始化表

/home/bigdata/conglingdaoyi/maxwell/maxwell-1.25.0/bin/maxwell --config /home/bigdata/conglingdaoyi/maxwell/maxwell-1.25.0/config.properties

 同步数据到kafka(能使用下面命令的前提是先启动了maxwell)

bin/maxwell-bootstrap --user maxwell --password 123456 --host master --database school --table tb_stu --client_id maxwell_1
./kafka-topics.sh --bootstrap-server master:9092 --list

消费主题

./kafka-console-consumer.sh --bootstrap-server master:9092 --topic school_db

 下面是采集到的数据例子

ODS层

读取kafka里面的数据进行分流处理得到ods

精准一次性 消费

pom.xml


  
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark </groupId>
  4. <artifactId>spark-core_2.12 </artifactId>
  5. <version>3.0.0 </version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.spark </groupId>
  9. <artifactId>spark-sql_2.12 </artifactId>
  10. <version>3.0.0 </version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.spark </groupId>
  14. <artifactId>spark-streaming_2.12 </artifactId>
  15. <version>3.0.0 </version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.spark </groupId>
  19. <artifactId>spark-streaming-kafka-0-10_2.12 </artifactId>
  20. <version>3.0.0 </version>
  21. </dependency>
  22. <dependency>
  23. <groupId>com.fasterxml.jackson.core </groupId>
  24. <artifactId>jackson-core </artifactId>
  25. <version>2.10.1 </version>
  26. </dependency>
  27. <dependency>
  28. <groupId>redis.clients </groupId>
  29. <artifactId>jedis </artifactId>
  30. <version>4.2.0 </version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.logging.log4j </groupId>
  34. <artifactId>log4j-to-slf4j </artifactId>
  35. <version>2.11.0 </version>
  36. </dependency>
  37. <dependency>
  38. <groupId>com.alibaba </groupId>
  39. <artifactId>fastjson </artifactId>
  40. <version>1.2.62 </version>
  41. </dependency>
  42. <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
  43. <dependency>
  44. <groupId>org.elasticsearch </groupId>
  45. <artifactId>elasticsearch </artifactId>
  46. <version>7.12.1 </version>
  47. </dependency>
  48. <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
  49. <dependency>
  50. <groupId>org.elasticsearch.client </groupId>
  51. <artifactId>elasticsearch-rest-high-level-client </artifactId>
  52. <version>7.12.1 </version>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.projectlombok </groupId>
  56. <artifactId>lombok </artifactId>
  57. <version>1.18.24 </version>
  58. </dependency>
  59. <dependency>
  60. <groupId>org.apache.phoenix </groupId>
  61. <artifactId>phoenix-spark </artifactId>
  62. <version>5.0.0-HBase-2.0 </version>
  63. <exclusions>
  64. <exclusion>
  65. <groupId>org.glassfish </groupId>
  66. <artifactId>javax.el </artifactId>
  67. </exclusion>
  68. </exclusions>
  69. </dependency>
  70. </dependencies>
  71. <build>
  72. <plugins>
  73. <plugin>
  74. <groupId>org.apache.maven.plugins </groupId>
  75. <artifactId>maven-compiler-plugin </artifactId>
  76. <version>3.1 </version>
  77. <configuration>
  78. <source>1.8 </source>
  79. <target>1.8 </target>
  80. </configuration>
  81. </plugin>
  82. <plugin>
  83. <groupId>org.apache.maven.plugins </groupId>
  84. <artifactId>maven-shade-plugin </artifactId>
  85. <version>2.4.3 </version>
  86. <executions>
  87. <execution>
  88. <id>shade-my-jar </id>
  89. <phase>package </phase>
  90. <goals>
  91. <goal>shade </goal>
  92. </goals>
  93. <configuration>
  94. <filters>
  95. <filter>
  96. <artifact>*:* </artifact>
  97. <excludes>
  98. <!--
  99. zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
  100. -->
  101. <exclude>META-INF/*.SF </exclude>
  102. <exclude>META-INF/*.DSA </exclude>
  103. <exclude>META-INF/*.RSA </exclude>
  104. </excludes>
  105. </filter>
  106. </filters>
  107. <transformers>
  108. <transformer
  109. implementation= "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  110. <!-- 修改成自己的启动类-->
  111. <mainClass>com.zhang.realtime.app.app.SparkStreamKafkaOfferset </mainClass>
  112. </transformer>
  113. </transformers>
  114. </configuration>
  115. </execution>
  116. </executions>
  117. </plugin>
  118. <plugin>
  119. <groupId>net.alchim31.maven </groupId>
  120. <artifactId>scala-maven-plugin </artifactId>
  121. <version>3.2.0 </version>
  122. <executions>
  123. <execution>
  124. <goals>
  125. <goal>compile </goal>
  126. <goal>testCompile </goal>
  127. </goals>
  128. <configuration>
  129. <args>
  130. <!--<arg>-make:transitive</arg>-->
  131. <arg>-dependencyfile </arg>
  132. <arg>${project.build.directory}/.scala_dependencies </arg>
  133. </args>
  134. </configuration>
  135. </execution>
  136. </executions>
  137. </plugin>
  138. </plugins>
  139. </build>

配置文件

log4j.properties


  
  1. log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender
  2. log4j.appender.atguigu.MyConsole.target=System.out
  3. log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss} %10p (%c:%M) - %m%n
  5. log4j.rootLogger =error

hbase-site.xml


  
  1. <?xml version="1.0"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <!--
  4. /**
  5. * Licensed to the Apache Software Foundation (ASF) under one
  6. * or more contributor license agreements. See the NOTICE file
  7. * distributed with this work for additional information
  8. * regarding copyright ownership. The ASF licenses this file
  9. * to you under the Apache License, Version 2.0 (the
  10. * "License"); you may not use this file except in compliance
  11. * with the License. You may obtain a copy of the License at
  12. *
  13. * http://www.apache.org/licenses/LICENSE-2.0
  14. *
  15. * Unless required by applicable law or agreed to in writing, software
  16. * distributed under the License is distributed on an "AS IS" BASIS,
  17. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. * See the License for the specific language governing permissions and
  19. * limitations under the License.
  20. */
  21. -->
  22. <configuration>
  23. <property>
  24. <name>hbase.regionserver.wal.codec </name>
  25. <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec </value>
  26. </property>
  27. <!-- 注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加上以上两个配置,并使用xsync进行同步-->
  28. <property>
  29. <name>phoenix.schema.isNamespaceMappingEnabled </name>
  30. <value>true </value>
  31. </property>
  32. <property>
  33. <name>phoenix.schema.mapSystemTablesToNamespace </name>
  34. <value>true </value>
  35. </property>
  36. </configuration>

config.properties


  
  1. # Kafka
  2. kafka.broker.list =master : 9092 ,node1 : 9092 ,node2 : 9092
  3. # Redis
  4. redis.host =node1
  5. redis.port = 6379

工具类

PhoenixUtil

  
  1. object PhoenixUtil {
  2. def queryList (sql: String): List[JSONObject] = {
  3. Class.forName( "org.apache.phoenix.jdbc.PhoenixDriver")
  4. val resultList: ListBuffer[JSONObject] = new ListBuffer[JSONObject]()
  5. val conn: Connection =
  6. DriverManager.getConnection( "jdbc:phoenix:master,node1,node2:2181")
  7. val stat: Statement = conn.createStatement
  8. println (sql)
  9. val rs: ResultSet = stat.executeQuery(sql)
  10. val md: ResultSetMetaData = rs.getMetaData
  11. while (rs.next) {
  12. val rowData = new JSONObject();
  13. for (i <- 1 to md.getColumnCount) {
  14. rowData.put(md.getColumnName(i), rs.getObject(i))
  15. }
  16. resultList += rowData
  17. }
  18. stat.close()
  19. conn.close()
  20. resultList.toList
  21. }
  22. }

OffsetManagerUtil


  
  1. object OffsetManagerUtil {
  2. //得到配置文件类
  3. val properties: Properties = MyPropertiesUtil.load( "config.properties")
  4. //获取redis的连接池
  5. val jedisPool = new JedisPool(properties.getProperty( "redis.host"), properties.getProperty( "redis.port").toInt)
  6. /**
  7. *
  8. * @param topic 要保存的主题
  9. * @param groupName 消费者组
  10. * @param offsetRanges kafkaInputDStream的父类HasOffsetRanges里面有记录每一个分区从哪里消费fromOffset,消费到哪里untilOffset
  11. */
  12. def saveOffset (topic:String,groupName:String,offsetRanges:Array[OffsetRange]): Unit = {
  13. if(offsetRanges!= null&&offsetRanges.length> 0){
  14. //得到连接
  15. val jedisClient: Jedis = jedisPool.getResource
  16. //申明最后得到的hash的key,"offset:"+topic+":"+groupName
  17. val offsetHashKey= "offset:"+topic+ ":"+groupName
  18. val offsetMap = new mutable.HashMap[String, String]()
  19. for (elem <- offsetRanges) {
  20. //消费的分区
  21. val partition: Int = elem.partition
  22. //消费的这个分区的下一次消费开始的偏移量
  23. val untilOffset: Long = elem.untilOffset
  24. //分区为kay,下一次开始的偏移量为value,开始使用latest的时候fromOffset和untilOffset的值是一样的,untilOffset – Exclusive ending offset
  25. println(elem.fromOffset.toString+ " "+untilOffset.toString)
  26. offsetMap.put(partition.toString,untilOffset.toString)
  27. }
  28. //java集合装换成scala集合的隐私装换
  29. import scala.collection.JavaConverters._
  30. //redis如果保存的map为空那么会报错
  31. if(!offsetMap.isEmpty){
  32. jedisClient.hmset(offsetHashKey,offsetMap.asJava)
  33. }
  34. jedisPool.returnResource(jedisClient)
  35. }
  36. }
  37. /**
  38. * 得到对应主题的消费起始偏移量
  39. * @param topic 主题
  40. * @param groupName 消费者组名
  41. * @return
  42. */
  43. def getOfferSet (topic: String, groupName: String): mutable.Map[TopicPartition, Long] = {
  44. val jedisClient: Jedis = jedisPool.getResource
  45. //根据主题名,还有消费者组名得到上一次消费的分区
  46. val offsetHashKey= "offset:"+topic+ ":"+groupName
  47. val result: util.Map[String, String] = jedisClient.hgetAll(offsetHashKey)
  48. //得到偏移量的信息之后把连接返回连接池
  49. jedisPool.returnResource(jedisClient)
  50. val target: mutable.Map[TopicPartition, Long] = collection.mutable.Map[TopicPartition, Long]()
  51. result.entrySet().forEach(
  52. entry=>{
  53. val partition: String = entry.getKey
  54. val partitionOffset: String = entry.getValue
  55. val temp: TopicPartition = new TopicPartition(topic,partition.toInt)
  56. target.put(temp,partitionOffset.toLong)
  57. }
  58. )
  59. target
  60. }
  61. }

MyPropertiesUtil


  
  1. object MyPropertiesUtil {
  2. /**
  3. * 测试加载配置信息
  4. * @param args
  5. */
  6. def main (args: Array[String]): Unit = {
  7. val properties: Properties = MyPropertiesUtil.load( "config.properties")
  8. println(properties.getProperty( "kafka.broker.list"))
  9. }
  10. /**
  11. * 加载配置文件
  12. * @param propertieName
  13. * @return
  14. */
  15. def load (propertieName:String): Properties ={
  16. val prop= new Properties();
  17. prop.load( new
  18. InputStreamReader(Thread.currentThread().getContextClassLoader.
  19. getResourceAsStream(propertieName) , "UTF-8"))
  20. prop
  21. }
  22. }

MyKafkaUtil


  
  1. object MyKafkaUtil {
  2. //读取配置文件
  3. val properties: Properties = MyPropertiesUtil.load( "config.properties")
  4. //sparkstreaming消费kafka的kafka参数
  5. private val kafkaParams = collection.immutable.Map[String, Object](
  6. //kafka的服务节点
  7. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> properties.getProperty( "kafka.broker.list"),
  8. //kafka的key序列化解码器
  9. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  10. //kafka的value序列化解码器
  11. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  12. //消费者组id
  13. ConsumerConfig.GROUP_ID_CONFIG -> "use_a_separate_group_id_for_each_stream",
  14. //起始消费的位置
  15. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  16. //是否自动提交
  17. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> ( false: java.lang.Boolean)
  18. )
  19. /**
  20. * 根据提供的主题消费
  21. *
  22. * @param topic 指定消费的主题
  23. * @param streamingContext SparkStreamingContext
  24. * @return
  25. */
  26. def getInputDStreamByDefault (topic: String, streamingContext: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
  27. val topics = Array(topic)
  28. KafkaUtils.createDirectStream[String, String](
  29. streamingContext,
  30. //PreferBrokers表示Executor和kafka的Broker在一个节点的时候使用
  31. //PreferConsistent 尽量均衡的把分区放到Executor上面执行 (常用)
  32. //PreferFixed 指定分区由哪个主机去消费
  33. PreferConsistent,
  34. //Subscribe 根据主题进行消费,Assign,指定主题分区进行消费
  35. Subscribe[String, String](topics, kafkaParams)
  36. )
  37. }
  38. /**
  39. *
  40. * @param topic 指定消费的主题
  41. * @param offsetRange Map[TopicPartition, Long] 指定主题对应分区的偏移量进行消费
  42. * @param streamingContext SparkStreamingContext
  43. * @return
  44. */
  45. def getInputDStreamByMapTopicPartition (topic: String, offsetRange: mutable.Map[TopicPartition, Long], streamingContext: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
  46. val topics = Array(topic)
  47. KafkaUtils.createDirectStream[String, String](
  48. streamingContext,
  49. PreferConsistent,
  50. Subscribe[String, String](topics, kafkaParams, offsetRange)
  51. )
  52. }
  53. }

MyKafkaSinkUtil


  
  1. object MyKafkaSinkUtil {
  2. private val properties: Properties = MyPropertiesUtil.load( "config.properties")
  3. val broker_list = properties.getProperty( "kafka.broker.list")
  4. var kafkaProducer: KafkaProducer[String, String] = null
  5. def createKafkaProducer: KafkaProducer[String, String] = {
  6. val properties = new Properties
  7. properties.put( "bootstrap.servers", broker_list)
  8. properties.put( "key.serializer",
  9. "org.apache.kafka.common.serialization.StringSerializer")
  10. properties.put( "value.serializer",
  11. "org.apache.kafka.common.serialization.StringSerializer")
  12. properties.put( "enable.idempotence",( true: java.lang.Boolean))
  13. var producer: KafkaProducer[String, String] = null
  14. try
  15. producer = new KafkaProducer[String, String](properties)
  16. catch {
  17. case e: Exception =>
  18. e.printStackTrace()
  19. }
  20. producer
  21. }
  22. def send (topic: String, msg: String): Unit = {
  23. if (kafkaProducer == null) kafkaProducer = createKafkaProducer
  24. kafkaProducer.send( new ProducerRecord[String, String](topic, msg))
  25. }
  26. def send (topic: String,key:String, msg: String): Unit = {
  27. if (kafkaProducer == null) kafkaProducer = createKafkaProducer
  28. kafkaProducer.send( new ProducerRecord[String, String](topic,key, msg))
  29. }
  30. }

分流App

SchoolAppStart


  
  1. object SchoolAppStart {
  2. def main (args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. //这里的partition的数目要和kafka的分区数一致
  5. conf.setAppName( this.getClass.getSimpleName).setMaster( "local[4]")
  6. val ssc = new StreamingContext(conf, Seconds( 1))
  7. val topicName = "school_db"
  8. val groupName = "test"
  9. //在程序启动的时候获取偏移量
  10. val offset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(topicName, groupName)
  11. var kafkaInputDStream: InputDStream[ConsumerRecord[String, String]] = null
  12. //是否能够得到偏移量,有就根据得到的偏移量消费消息
  13. if (offset != null && offset.size > 0) {
  14. kafkaInputDStream = MyKafkaUtil.getInputDStreamByMapTopicPartition(topicName, offset, ssc)
  15. } else {
  16. kafkaInputDStream = MyKafkaUtil.getInputDStreamByDefault(topicName, ssc)
  17. }
  18. //通过装换成HasOffsetRanges得到Array[OffsetRange],才能拿到下一次消费的开始偏移量untilOffset
  19. var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
  20. val transformDStream: DStream[ConsumerRecord[String, String]] = kafkaInputDStream.transform(
  21. rdd => {
  22. val ranges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  23. offsetRanges = ranges.offsetRanges
  24. rdd
  25. }
  26. )
  27. val kafkaValue: DStream[String] = transformDStream.map(_.value())
  28. // 由于得到的数据入下面
  29. // {"database":"school","table":"tb_stu","type":"bootstrap-start","ts":1657768898,"data":{}}
  30. // {"database":"school","table":"tb_stu","type":"bootstrap-insert","ts":1657768898,"data":{"id":"ea26cc","stu_name":"小张","area_code":"0001"}}
  31. // {"database":"school","table":"tb_stu","type":"bootstrap-insert","ts":1657768898,"data":{"id":"e8e4ee","stu_name":"小名","area_code":"0002"}}
  32. // {"database":"school","table":"tb_stu","type":"bootstrap-complete","ts":1657768898,"data":{}}
  33. //先对school_db数据进行分流
  34. val distrubeRes: DStream[JSONObject] = kafkaValue.mapPartitions(iter => {
  35. //保存好的结果
  36. val res: ListBuffer[JSONObject] = ListBuffer[JSONObject]()
  37. for (elem <- iter) {
  38. val kafkaValueJson: JSONObject = JSON.parseObject(elem)
  39. val tableName: String = kafkaValueJson.getString( "table")
  40. val dataJson: JSONObject = kafkaValueJson.getJSONObject( "data")
  41. val maxwellType: String = kafkaValueJson.getString( "type")
  42. if (dataJson != null && !dataJson.isEmpty) {
  43. //分流到kafka
  44. dataJson.put( "maxwelltype", maxwellType)
  45. dataJson.put( "tableName",tableName)
  46. res.append(dataJson)
  47. }
  48. }
  49. res.iterator
  50. })
  51. distrubeRes.foreachRDD(rdd=>{
  52. rdd.foreach(item=>{
  53. val tableName: String = item.getString( "tableName")
  54. item.remove( "tableName")
  55. val resTopic= "ods_"+tableName
  56. MyKafkaSinkUtil.send(resTopic,item.toString)
  57. })
  58. //如果都操作完了这里就是保存偏移量
  59. OffsetManagerUtil.saveOffset(topicName, groupName, offsetRanges)
  60. // 手动提交kafka的偏移量
  61. kafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  62. })
  63. ssc.start()
  64. ssc.awaitTermination()
  65. }
  66. }

ods结果

里面的数据为例子

{"maxwelltype":"bootstrap-insert","id":"2","tearch_name":"小张老师"}


  
  1. case class TbTearch(
  2. id:String,
  3. tearchName:String
  4. )

驼峰命名可以直接装换

JSON.toJavaObject(JSON.parseObject(kafkaValueItem), classOf[TbTearch])

工程结构

DIM层

由于我们的功能是,个个地区实时学生共享优异成绩统计,所以我们可以用ods_tb_tearch,ods_tb_course,ods_tb_area 我们选用教师,课程,地区作为维度分析

维度表我们选用hbase作为存储,优点,海量,快速

先创建教师维度表

create table school_dim_tb_tearch (id varchar primary key ,tearch_name varchar) SALT_BUCKETS = 3;

保存到PhoenixApp

DimTbTearch 


  
  1. object DimTbTearch {
  2. def main (args: Array[String]): Unit = {
  3. val sparkConf: SparkConf = new
  4. SparkConf().setMaster( "local[4]").setAppName( "BaseTrademarkApp")
  5. val ssc = new StreamingContext(sparkConf, Seconds( 5))
  6. val topic = "ods_tb_tearch";
  7. val groupId = "dim_ods_tb_tearch"
  8. / 偏移量处理 ///
  9. val offset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(topic,
  10. groupId)
  11. var inputDstream: InputDStream[ConsumerRecord[String, String]] = null
  12. // 判断如果从 redis 中读取当前最新偏移量 则用该偏移量加载 kafka 中的数据 否则直接用
  13. if (offset != null && offset.size > 0) {
  14. inputDstream = MyKafkaUtil.getInputDStreamByMapTopicPartition(topic, offset, ssc)
  15. } else {
  16. inputDstream = MyKafkaUtil.getInputDStreamByDefault(topic, ssc)
  17. }
  18. //取得偏移量步长
  19. var offsetRanges: Array[OffsetRange] = null
  20. val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] =
  21. inputDstream.transform {
  22. rdd =>{
  23. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  24. rdd
  25. }
  26. }
  27. val objectDstream: DStream[TbTearch] = inputGetOffsetDstream.map {
  28. record =>{
  29. val jsonStr: String = record.value()
  30. val obj: TbTearch = JSON.parseObject(jsonStr, classOf[TbTearch])
  31. obj
  32. }
  33. }
  34. import org.apache.phoenix.spark._
  35. //保存到 Hbase
  36. objectDstream.foreachRDD{rdd=>
  37. rdd.cache()
  38. rdd.saveToPhoenix( "SCHOOL_DIM_TB_TEARCH",Seq( "ID", "TEARCH_NAME" )
  39. , new Configuration,Some( "192.168.66.20,192.168.66.10,192.168.66.21:2181"))
  40. rdd.foreach(println)
  41. OffsetManagerUtil.saveOffset(topic,groupId, offsetRanges)
  42. }
  43. ssc.start()
  44. ssc.awaitTermination()
  45. }
  46. }

启动以后执行

bin/maxwell-bootstrap --user maxwell --password 123456 --host master --database school --table tb_tearch --client_id maxwell_1

结果

 kafka监听

./kafka-console-consumer.sh --bootstrap-server master:9092 --topic ods_tb_tearch

DimTbCourse

得到主题结果

bin/maxwell-bootstrap --user maxwell --password 123456 --host master --database school --table tb_course --client_id maxwell_1
./kafka-console-consumer.sh --bootstrap-server master:9092 --topic ods_tb_course
{"maxwelltype":"bootstrap-insert","course_name":"美术","teacher_id":"1","id":"5"}

创建对应的对象


  
  1. case class TbCourse(
  2. id:String,
  3. course_name:String,
  4. teacher_id:String
  5. )

创建Phoneix表

create table school_dim_tb_course (id varchar primary key ,tearch_id varchar,course_name varchar) SALT_BUCKETS = 3;
select * from SCHOOL_DIM_TB_COURSE;

app 


  
  1. object DimTbCourse {
  2. def main( args: Array[ String]): Unit = {
  3. val sparkConf: SparkConf = new
  4. SparkConf(). setMaster( "local[4]"). setAppName( "BaseTrademarkApp")
  5. val ssc = new StreamingContext(sparkConf, Seconds( 5))
  6. val topic = "ods_tb_course";
  7. val groupId = "dim_ods_tb_stu_course"
  8. / 偏移量处理 ///
  9. val offset: mutable. Map[ TopicPartition, Long] = OffsetManagerUtil. getOfferSet(topic,
  10. groupId)
  11. var inputDstream: InputDStream[ ConsumerRecord[ String, String]] = null
  12. // 判断如果从 redis 中读取当前最新偏移量 则用该偏移量加载 kafka 中的数据 否则直接用
  13. if (offset != null && offset. size > 0) {
  14. inputDstream = MyKafkaUtil. getInputDStreamByMapTopicPartition(topic, offset, ssc)
  15. } else {
  16. inputDstream = MyKafkaUtil. getInputDStreamByDefault(topic, ssc)
  17. }
  18. //取得偏移量步长
  19. var offsetRanges: Array[ OffsetRange] = null
  20. val inputGetOffsetDstream: DStream[ ConsumerRecord[ String, String]] =
  21. inputDstream. transform {
  22. rdd =>{
  23. offsetRanges = rdd. asInstanceOf[ HasOffsetRanges]. offsetRanges
  24. rdd
  25. }
  26. }
  27. val objectDstream: DStream[ TbCourse] = inputGetOffsetDstream. map {
  28. record =>{
  29. val jsonStr: String = record. value()
  30. val obj: TbCourse = JSON. parseObject(jsonStr, classOf[ TbCourse])
  31. obj
  32. }
  33. }
  34. import org. apache. phoenix. spark. _
  35. //保存到 Hbase
  36. objectDstream. foreachRDD{ rdd=>
  37. rdd. cache()
  38. rdd. saveToPhoenix( "SCHOOL_DIM_TB_COURSE", Seq( "ID", "COURSE_NAME", "TEARCH_ID" )
  39. , new Configuration, Some( "192.168.66.20,192.168.66.10,192.168.66.21:2181"))
  40. rdd. foreach(println)
  41. OffsetManagerUtil. saveOffset(topic,groupId, offsetRanges)
  42. }
  43. ssc. start()
  44. ssc. awaitTermination()
  45. }
  46. }

DimTbArea

bin/maxwell-bootstrap --user maxwell --password 123456 --host master --database school --table tb_area --client_id maxwell_1

  
  1. { "area_name" : "华东地区" , "maxwelltype" : "bootstrap-insert" , "area_code" : "0001" , "id" : "1" }
  2. { "area_name" : "华南地区" , "maxwelltype" : "bootstrap-insert" , "area_code" : "0002" , "id" : "2" }
  3. { "area_name" : "华北地区" , "maxwelltype" : "bootstrap-insert" , "area_code" : "0003" , "id" : "3" }

创建对应的Json对应的类


  
  1. case class TbAear(
  2. id:String,
  3. areaCode:String,
  4. areaName:String
  5. )

创建Phoenix表

create table school_dim_tb_area (id varchar primary key ,area_name varchar,area_code varchar) SALT_BUCKETS = 3;
select * from SCHOOL_DIM_TB_AREA;

 app


  
  1. object DimTbArea {
  2. def main (args: Array[String]): Unit = {
  3. val sparkConf: SparkConf = new
  4. SparkConf().setMaster( "local[4]").setAppName( "DimTbArea")
  5. val ssc = new StreamingContext(sparkConf, Seconds( 5))
  6. val topic = "ods_tb_area";
  7. val groupId = "dim_ods_tb_area"
  8. / 偏移量处理 ///
  9. val offset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(topic,
  10. groupId)
  11. var inputDstream: InputDStream[ConsumerRecord[String, String]] = null
  12. // 判断如果从 redis 中读取当前最新偏移量 则用该偏移量加载 kafka 中的数据 否则直接用
  13. if (offset != null && offset.size > 0) {
  14. inputDstream = MyKafkaUtil.getInputDStreamByMapTopicPartition(topic, offset, ssc)
  15. } else {
  16. inputDstream = MyKafkaUtil.getInputDStreamByDefault(topic, ssc)
  17. }
  18. //取得偏移量步长
  19. var offsetRanges: Array[OffsetRange] = null
  20. val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] =
  21. inputDstream.transform {
  22. rdd =>{
  23. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  24. rdd
  25. }
  26. }
  27. val objectDstream: DStream[TbAear] = inputGetOffsetDstream.map {
  28. record =>{
  29. val jsonStr: String = record.value()
  30. val obj: TbAear = JSON.parseObject(jsonStr, classOf[TbAear])
  31. obj
  32. }
  33. }
  34. import org.apache.phoenix.spark._
  35. //保存到 Hbase
  36. objectDstream.foreachRDD{rdd=>
  37. rdd.cache()
  38. rdd.saveToPhoenix( "SCHOOL_DIM_TB_AREA",Seq( "ID", "AREA_CODE", "AREA_NAME" )
  39. , new Configuration,Some( "192.168.66.20,192.168.66.10,192.168.66.21:2181"))
  40. rdd.foreach(println)
  41. OffsetManagerUtil.saveOffset(topic,groupId, offsetRanges)
  42. }
  43. ssc.start()
  44. ssc.awaitTermination()
  45. }
  46. }

dim层结果


  
  1. select * from SCHOOL_DIM_TB_AREA;
  2. select * from SCHOOL_DIM_TB_COURSE;
  3. select * from SCHOOL_DIM_TB_TEARCH;

DWD层

DwdTbStu

获取数据

./kafka-console-consumer.sh --bootstrap-server master:9092 --topic ods_tb_stu

点击提交 

 得到的数据

{"maxwelltype":"insert","area_code":"0001","stu_name":"12","id":"fd4fa5"}

 开始处理ods_tb_stu里面的数据

  • 第一步 先得到所有的地区数据然后变成RDD对象
  • 然后和ods_tb_stu里面的数据进行join得到DWD的事实表数据

实现代码


  
  1. object DwdTbStu {
  2. def main (args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. //这里的partition的数目要和kafka的分区数一致
  5. conf.setAppName( this.getClass.getSimpleName).setMaster( "local[4]")
  6. val ssc = new StreamingContext(conf, Seconds( 1))
  7. val topicName = "ods_tb_stu"
  8. val groupName = "test"
  9. //在程序启动的时候获取偏移量
  10. val offset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(topicName, groupName)
  11. var kafkaInputDStream: InputDStream[ConsumerRecord[String, String]] = null
  12. //是否能够得到偏移量,有就根据得到的偏移量消费消息
  13. if (offset != null && offset.size > 0) {
  14. kafkaInputDStream = MyKafkaUtil.getInputDStreamByMapTopicPartition(topicName, offset, ssc)
  15. } else {
  16. kafkaInputDStream = MyKafkaUtil.getInputDStreamByDefault(topicName, ssc)
  17. }
  18. //通过装换成HasOffsetRanges得到Array[OffsetRange],才能拿到下一次消费的开始偏移量untilOffset
  19. var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
  20. val transformDStream: DStream[ConsumerRecord[String, String]] = kafkaInputDStream.transform(
  21. rdd => {
  22. val ranges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  23. offsetRanges = ranges.offsetRanges
  24. rdd
  25. }
  26. )
  27. val kafkaValue: DStream[String] = transformDStream.map(_.value())
  28. val allAear: List[JSONObject] = PhoenixUtil.queryList( "select * from SCHOOL_DIM_TB_AREA")
  29. val resAllAear: List[(String, String)] = allAear.map(allAearItem => {
  30. (allAearItem.getString( "AREA_CODE"), allAearItem.getString( "AREA_NAME"))
  31. })
  32. //得到地区的RDD
  33. val aearRDD: RDD[(String, String)] = ssc.sparkContext.parallelize(resAllAear)
  34. val aearKeyAndTbstuValue: DStream[(String, TbStu)] = kafkaValue.mapPartitions(iter => {
  35. val res: ListBuffer[(String, TbStu)] = ListBuffer[(String, TbStu)]()
  36. for (elem <- iter) {
  37. val targetTbStu: TbStu = JSON.toJavaObject(JSON.parseObject(elem), classOf[TbStu])
  38. res.append((targetTbStu.areaCode, targetTbStu))
  39. }
  40. res.iterator
  41. })
  42. //对于得到的Stu数据和aearBroadCast进行Join操作
  43. val res: DStream[TbStu] = aearKeyAndTbstuValue.transform(rdd => {
  44. val res: RDD[(String, (TbStu, Option[String]))] = rdd.leftOuterJoin(aearRDD)
  45. res.map(item => {
  46. val tbStu: TbStu = item._2._1
  47. val aearName: Option[String] = item._2._2
  48. tbStu.aearName = aearName.getOrElse( "")
  49. tbStu
  50. })
  51. })
  52. //这里是得到的结果
  53. res.foreachRDD(rdd=>{
  54. rdd.foreachPartition(iter=>{
  55. for (tbStuItem <- iter) {
  56. val tbStuItemJSONString: String = JSON.toJSONString(tbStuItem,JSON.DEFAULT_GENERATE_FEATURE)
  57. MyKafkaSinkUtil.send( "dwd_tb_stu",tbStuItemJSONString)
  58. }
  59. })
  60. //按分区对于Stu的地区进行赋值
  61. //如果都操作完了这里就是保存偏移量
  62. OffsetManagerUtil.saveOffset(topicName, groupName, offsetRanges)
  63. // 手动提交kafka的偏移量
  64. kafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  65. })
  66. ssc.start()
  67. ssc.awaitTermination()
  68. }
  69. }

查看得到的数据

./kafka-console-consumer.sh --bootstrap-server master:9092 --topic dwd_tb_stu

点击提交以后 

 得到数据

{"aearName":"华北地区","areaCode":"0003","id":"52f31b","stuName":"同学你好"}

可以看到ods_tb_stu和维度数据进行了匹配

Scala对象装换成JSON


  
  1. case class TbStu(
  2. @BeanProperty id: String,
  3. @BeanProperty areaCode: String,
  4. @BeanProperty stuName: String,
  5. @BeanProperty var aearName: String
  6. )
 val tbStuItemJSONString: String = JSON.toJSONString(tbStuItem,JSON.DEFAULT_GENERATE_FEATURE)

DwdTbStuCourse


  
  1. object DwdTbStuCourse {
  2. def main (args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. //这里的partition的数目要和kafka的分区数一致
  5. conf.setAppName( this.getClass.getSimpleName).setMaster( "local[4]")
  6. val ssc = new StreamingContext(conf, Seconds( 1))
  7. val topicName = "ods_tb_stu_course"
  8. val groupName = "test"
  9. //在程序启动的时候获取偏移量
  10. val offset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(topicName, groupName)
  11. var kafkaInputDStream: InputDStream[ConsumerRecord[String, String]] = null
  12. //是否能够得到偏移量,有就根据得到的偏移量消费消息
  13. if (offset != null && offset.size > 0) {
  14. kafkaInputDStream = MyKafkaUtil.getInputDStreamByMapTopicPartition(topicName, offset, ssc)
  15. } else {
  16. kafkaInputDStream = MyKafkaUtil.getInputDStreamByDefault(topicName, ssc)
  17. }
  18. //通过装换成HasOffsetRanges得到Array[OffsetRange],才能拿到下一次消费的开始偏移量untilOffset
  19. var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
  20. val transformDStream: DStream[ConsumerRecord[String, String]] = kafkaInputDStream.transform(
  21. rdd => {
  22. val ranges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  23. offsetRanges = ranges.offsetRanges
  24. rdd
  25. }
  26. )
  27. val kafkaValue: DStream[String] = transformDStream.map(_.value())
  28. val dim_course: List[JSONObject] = PhoenixUtil.queryList( "select * from SCHOOL_DIM_TB_COURSE")
  29. val dim_tearch: List[JSONObject] = PhoenixUtil.queryList( "select * from SCHOOL_DIM_TB_TEARCH")
  30. val tempDim_course: List[(String, JSONObject)] = dim_course.map(item => {
  31. val tearchId: String = item.getString( "TEARCH_ID")
  32. (tearchId, item)
  33. })
  34. val tempDim_tearch: List[(String, String)] = dim_tearch.map(item => {
  35. val tearchId: String = item.getString( "ID")
  36. (tearchId, item.getString( "TEARCH_NAME"))
  37. })
  38. val tempDim_courseRDD: RDD[(String, JSONObject)] = ssc.sparkContext.parallelize(tempDim_course)
  39. val tempDim_tearchRDD: RDD[(String, String)] = ssc.sparkContext.parallelize(tempDim_tearch)
  40. val courseAndTearchName: RDD[(String, (String, JSONObject))] = tempDim_tearchRDD.join(tempDim_courseRDD)
  41. val dim_res: RDD[(String, (String, String))] = courseAndTearchName.map(item => {
  42. val course: JSONObject = item._2._2
  43. val courseId: String = course.getString( "ID")
  44. val courseName: String = course.getString( "COURSE_NAME")
  45. val teachName: String = item._2._1
  46. val res: (String, (String, String)) = (courseId, (courseName, teachName))
  47. res
  48. })
  49. dim_res.cache()
  50. // (2,(语文,小张老师))
  51. // (1,(数学,小李老师))
  52. // (5,(美术,小李老师))
  53. // (3,(体育,小明老师))
  54. // (4,(英语,小明老师))
  55. // kafkaValue
  56. // {"stu_id":"0d1269","course_id":"2","score":"120","maxwelltype":"insert","id":"8bc0ed"}
  57. val targetTbStuCourse: DStream[(String, TbStuCourse)] = kafkaValue.map(item => {
  58. val tbStuCourseItem: TbStuCourse = JSON.toJavaObject(JSON.parseObject(item), classOf[TbStuCourse])
  59. (tbStuCourseItem.courseId, tbStuCourseItem)
  60. })
  61. val res: DStream[TbStuCourse] = targetTbStuCourse.transform(rdd => {
  62. val res: RDD[(String, (TbStuCourse, (String, String)))] = rdd.join(dim_res)
  63. res.map(item => {
  64. val tbStuCourse: TbStuCourse = item._2._1
  65. val courseNameAndTearchName: (String, String) = item._2._2
  66. tbStuCourse.courseName = courseNameAndTearchName._1
  67. tbStuCourse.tearchName = courseNameAndTearchName._2
  68. tbStuCourse
  69. })
  70. })
  71. res.foreachRDD(rdd => {
  72. rdd.foreach(item => {
  73. val resjson: String = JSON.toJSONString(item,JSON.DEFAULT_GENERATE_FEATURE)
  74. MyKafkaSinkUtil.send( "dwd_tb_stu_course",resjson)
  75. })
  76. //如果都操作完了这里就是保存偏移量
  77. OffsetManagerUtil.saveOffset(topicName, groupName, offsetRanges)
  78. // 手动提交kafka的偏移量
  79. kafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  80. })
  81. ssc.start()
  82. ssc.awaitTermination()
  83. }
  84. }

查看得到的数据为

./kafka-console-consumer.sh --bootstrap-server master:9092 --topic dwd_tb_stu_course
{"courseId":"2","courseName":"语文","id":"d2f9f4","score":"120","stuId":"cbb80e","tearchName":"小张老师"}

DWS层

双流join

DwsStuScoreJoin


  
  1. object DwsStuScoreJoin {
  2. def main (args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. //这里的partition的数目要和kafka的分区数一致
  5. conf.setAppName( this.getClass.getSimpleName).setMaster( "local[4]")
  6. val ssc = new StreamingContext(conf, Seconds( 1))
  7. // 分别读取两条流
  8. val stuCourseTopicName = "dwd_tb_stu_course"
  9. val stuCourseGroupName = "dwd_tb_stu_course"
  10. val stuTopicName = "dwd_tb_stu"
  11. val stuGroupName = "dwd_tb_stu"
  12. //得到dwd_tb_stu_course的DStream
  13. val stuCourseOffset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(stuCourseTopicName, stuCourseGroupName)
  14. var stuCourseKafkaInputDStream: InputDStream[ConsumerRecord[String, String]] = null
  15. if (stuCourseOffset != null && stuCourseOffset.size > 0) {
  16. stuCourseKafkaInputDStream = MyKafkaUtilDwsStuScore.getInputDStreamByMapTopicPartition(stuCourseTopicName, stuCourseOffset, ssc)
  17. } else {
  18. stuCourseKafkaInputDStream = MyKafkaUtilDwsStuScore.getInputDStreamByDefault(stuCourseTopicName, ssc)
  19. }
  20. var stuCourseOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
  21. val stuCourseTransformDStream: DStream[ConsumerRecord[String, String]] = stuCourseKafkaInputDStream.transform(
  22. rdd => {
  23. val ranges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  24. stuCourseOffsetRanges = ranges.offsetRanges
  25. rdd
  26. }
  27. )
  28. val stuCourseKafkaValue: DStream[String] = stuCourseTransformDStream.map(_.value())
  29. //得到dwd_tb_stu的DStream
  30. val stuOffset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(stuTopicName, stuGroupName)
  31. var stuKafkaInputDStream: InputDStream[ConsumerRecord[String, String]] = null
  32. if (stuOffset != null && stuOffset.size > 0) {
  33. stuKafkaInputDStream = MyKafkaUtilDwsStuScore.getInputDStreamByMapTopicPartition(stuTopicName, stuOffset, ssc)
  34. } else {
  35. stuKafkaInputDStream = MyKafkaUtilDwsStuScore.getInputDStreamByDefault(stuTopicName, ssc)
  36. }
  37. var stuOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
  38. val stuTransformDStream: DStream[ConsumerRecord[String, String]] = stuKafkaInputDStream.transform(
  39. rdd => {
  40. val ranges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  41. stuOffsetRanges = ranges.offsetRanges
  42. rdd
  43. }
  44. )
  45. val stuKafkaValue: DStream[String] = stuTransformDStream.map(_.value())
  46. // stuCourseKafkaValue.foreachRDD(rdd => {
  47. // rdd.foreach(item => {
  48. // println(item)
  49. { "courseId": "1", "courseName": "数学", "id": "069428", "score": "120", "stuId": "6be3f5", "tearchName": "小李老师"}
  50. // })
  51. // // 如果都操作完了这里就是保存偏移量
  52. // OffsetManagerUtil.saveOffset(stuCourseTopicName, stuCourseGroupName, stuCourseOffsetRanges)
  53. // // 手动提交kafka的偏移量
  54. // stuCourseKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(stuCourseOffsetRanges)
  55. //
  56. // })
  57. //
  58. // stuKafkaValue.foreachRDD(rdd => {
  59. // rdd.foreach(item => {
  60. // println(item)
  61. { "aearName": "华东地区", "areaCode": "0001", "id": "6be3f5", "stuName": "小同学"}
  62. // })
  63. //
  64. // // 如果都操作完了这里就是保存偏移量
  65. // OffsetManagerUtil.saveOffset(stuTopicName, stuGroupName, stuOffsetRanges)
  66. // // 手动提交kafka的偏移量
  67. // stuKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(stuOffsetRanges)
  68. // })
  69. // 上面能够得到两条流的数据
  70. //第一条流是stuCourseKafkaValue:{"courseId":"1","courseName":"数学","id":"069428","score":"120","stuId":"6be3f5","tearchName":"小李老师"}
  71. //第二条是stuKafkaValue{"aearName":"华东地区","areaCode":"0001","id":"6be3f5","stuName":"小同学"}
  72. // 下面是使用双流join的操作
  73. // 这里有三种情况使用redis缓存join,假设学生一次可以输入多条成绩信息的情况
  74. // 1. 如果stu到了,stuCourse也到了
  75. // 2. 如果stu到了,stuCourse没有到
  76. // 3. 如果stu没到 ,stuCourse 到了
  77. // 注意由于用学生关联成绩那么不管stuCourse到了,还是没有到都会在redis缓存防止有晚到的数据
  78. // 如果stuCourse缓存在redis里面,如果stu到了那么就把他删除
  79. // 1.先把数据变成key,value结构才能join
  80. val stuIdAndStu: DStream[(String, TbStu)] = stuKafkaValue.map(item => {
  81. val stu: TbStu = JSON.toJavaObject(JSON.parseObject(item), classOf[TbStu])
  82. (stu.id, stu)
  83. })
  84. val stuIdAndCourse: DStream[(String, TbStuCourse)] = stuCourseKafkaValue.map(item => {
  85. val course: TbStuCourse = JSON.toJavaObject(JSON.parseObject(item), classOf[TbStuCourse])
  86. (course.stuId, course)
  87. })
  88. //这里得到的是fullJoin的结果
  89. val fullJoin: DStream[(String, (Option[TbStu], Option[TbStuCourse]))] = stuIdAndStu.fullOuterJoin(stuIdAndCourse)
  90. val resStuWide: DStream[StuWide] = fullJoin.mapPartitions(iter => {
  91. val jedis: Jedis = OffsetManagerUtil.jedisPool.getResource
  92. val res: ListBuffer[StuWide] = ListBuffer[StuWide]()
  93. for ((stuId, (stu, course)) <- iter) {
  94. if (stu.isDefined) {
  95. //stu来了
  96. if (course.isDefined) {
  97. //如果course来了
  98. val resItem: StuWide = StuAndStuScore.getStuWide(stu.get, course.get)
  99. res.append(resItem)
  100. }
  101. //由于course来还是没有来stu保存在redis里面,目的就是等待晚来的数据
  102. //这里选用存储stu的数据格式为string,keuy=FullJoin:Stu:stuid
  103. val stuKey = s "FullJoin:Stu:${stu.get.id}"
  104. //把json数据传入进去,默认保存一天,根据自己的情况来定
  105. val stuJsonCache: String = JSON.toJSONString(stu.get, JSON.DEFAULT_GENERATE_FEATURE)
  106. jedis.setex(stuKey, 3600 * 24, stuJsonCache)
  107. //stu先到还要看下缓存里面有没有之前到的course
  108. val couKey = s "FullJoin:Course:${stu.get.id}"
  109. val courseCacheDatas: util.Set[String] = jedis.smembers(couKey)
  110. val scala: mutable.Set[String] = courseCacheDatas.asScala
  111. for (elem <- scala) {
  112. val courseItem: TbStuCourse = JSON.toJavaObject(JSON.parseObject(elem), classOf[TbStuCourse])
  113. //这里在把数据加入进去
  114. val stuRes: TbStu = stu.get
  115. val wide: StuWide = StuAndStuScore.getStuWide(stuRes, courseItem)
  116. res.append(wide)
  117. }
  118. //删除掉处理完的course数据
  119. jedis.del(couKey)
  120. } else {
  121. //如果stu没有来,我们选用set存储分数
  122. val courseKey = s "FullJoin:Course:${course.get.stuId}"
  123. val courseJsonCache: String = JSON.toJSONString(course.get, JSON.DEFAULT_GENERATE_FEATURE)
  124. jedis.sadd(courseKey, courseJsonCache)
  125. }
  126. }
  127. //关闭资源
  128. jedis.close()
  129. res.iterator
  130. })
  131. resStuWide.foreachRDD(rdd => {
  132. rdd.foreach(item=>{
  133. println( "===========")
  134. println(item)
  135. val stuWide: String = JSON.toJSONString(item, JSON.DEFAULT_GENERATE_FEATURE)
  136. MyKafkaSinkUtil.send( "dws_tb_stuwide",stuWide)
  137. })
  138. // 如果都操作完了这里就是保存偏移量
  139. OffsetManagerUtil.saveOffset(stuCourseTopicName, stuCourseGroupName, stuCourseOffsetRanges)
  140. // 手动提交kafka的偏移量
  141. stuCourseKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(stuCourseOffsetRanges)
  142. // 如果都操作完了这里就是保存偏移量
  143. OffsetManagerUtil.saveOffset(stuTopicName, stuGroupName, stuOffsetRanges)
  144. // 手动提交kafka的偏移量
  145. stuKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(stuOffsetRanges)
  146. })
  147. ssc.start()
  148. ssc.awaitTermination()
  149. }
  150. }

分别启动

 在双流join的时候,stu的数据保存到redis的形式为

最后效果

点击 

./kafka-console-consumer.sh --bootstrap-server master:9092 --topic dws_tb_stuwide
{"aearName":"华北地区","areaCode":"0003","courseId":"5","courseName":"美术","id":"321020","score":"90","scoreid":"8e0276","stuId":"321020","stuName":"双流join缓存实现成功","tearchName":"小李老师"}

 做到这里以后kafka里面的主题有

 ADS层

动态得到按地区提供的总分数据实时的保存到mysql,用于显示

这里我们使用mysql保存offset

创建表,第二个是统计表


  
  1. CREATE TABLE ` offset` (
  2. `group_id` varchar( 200) NOT NULL,
  3. `topic` varchar( 200) NOT NULL,
  4. `partition_id` int( 11) NOT NULL,
  5. `topic_offset` bigint( 20) DEFAULT NULL,
  6. PRIMARY KEY (`group_id`,`topic`,`partition_id`)
  7. ) ENGINE =InnoDB DEFAULT CHARSET =utf8;
  8. CREATE TABLE `aear_sumscore`
  9. (
  10. `aear_name` varchar( 20),
  11. `sum_score` decimal( 16, 2)
  12. )ENGINE =InnoDB DEFAULT CHARSET =utf8;

scalikejdbc精准一次性消费

pom.xml


  
  1. <!-- scala 操作 JDBC 小工具,方便对事务进行处理 -->
  2. <dependency>
  3. <groupId>org.scalikejdbc </groupId>
  4. <artifactId>scalikejdbc_2.12 </artifactId>
  5. <version>3.4.0 </version>
  6. </dependency>
  7. <!-- scalikejdbc-config_2.11 -->
  8. <dependency>
  9. <groupId>org.scalikejdbc </groupId>
  10. <artifactId>scalikejdbc-config_2.12 </artifactId>
  11. <version>3.4.0 </version>
  12. </dependency>
  13. <dependency>
  14. <groupId>mysql </groupId>
  15. <artifactId>mysql-connector-java </artifactId>
  16. <version>5.1.47 </version>
  17. </dependency>

配置文件

application.conf


  
  1. db. default.driver= "com.mysql.jdbc.Driver"
  2. db. default.url= "jdbc:mysql://master/school?characterEncoding=utf-8&useSSL=false"
  3. db. default.user= "root"
  4. db. default.password= "root"

偏移量保存工具


  
  1. object OffsetManagerM {
  2. def getOffset (topic: String, consumerGroupId: String): mutable.Map[TopicPartition, Long] = {
  3. val sql= " select group_id,topic,topic_offset,partition_id from offset" +
  4. " where topic='"+topic+ "' and group_id='"+consumerGroupId+ "'"
  5. val jsonObjList: List[JSONObject] = MySQLUtil.queryList(sql)
  6. val res: mutable.Map[TopicPartition, Long] = mutable.Map()
  7. jsonObjList.map {
  8. jsonObj =>{
  9. val topicPartition: TopicPartition = new TopicPartition(topic,
  10. jsonObj.getIntValue( "partition_id"))
  11. val offset: Long = jsonObj.getLongValue( "topic_offset")
  12. res.put(topicPartition,offset)
  13. }
  14. }
  15. res
  16. }
  17. }

MySQLUtil


  
  1. object MySQLUtil {
  2. def main (args: Array[String]): Unit = {
  3. val list: List[JSONObject] = queryList( "select * from tb_stu")
  4. println(list)
  5. }
  6. def queryList (sql: String): List[JSONObject] = {
  7. Class.forName( "com.mysql.jdbc.Driver")
  8. val resultList: ListBuffer[JSONObject] = new ListBuffer[JSONObject]()
  9. val conn: Connection = DriverManager.getConnection( "jdbc:mysql://master:3306/school?characterEncoding=utf-8&useSSL=false",
  10. "root",
  11. "root"
  12. )
  13. val stat: Statement = conn.createStatement
  14. println (sql)
  15. val rs: ResultSet = stat.executeQuery(sql)
  16. val md: ResultSetMetaData = rs.getMetaData
  17. while (rs.next) {
  18. val rowData = new JSONObject();
  19. for (i <- 1 to md.getColumnCount) {
  20. rowData.put(md.getColumnName(i), rs.getObject(i))
  21. }
  22. resultList += rowData
  23. }
  24. stat.close()
  25. conn.close()
  26. resultList.toList
  27. }
  28. }

最后得到了统计的结果,按地区实时统计优秀学生的贡献成绩 

AdsApp


  
  1. object Ads {
  2. def main (args: Array[String]): Unit = {
  3. // 加载流
  4. val sparkConf: SparkConf = new
  5. SparkConf().setMaster( "local[4]").setAppName( "TrademarkStatApp")
  6. val ssc = new StreamingContext(sparkConf, Seconds( 5))
  7. val groupId = "ads_tb_stuwide"
  8. val topic = "dws_tb_stuwide";
  9. //从 Mysql 中读取偏移量
  10. val offsetMapForKafka: mutable.Map[TopicPartition, Long] =
  11. OffsetManagerM.getOffset(topic, groupId)
  12. //把偏移量传递给 kafka ,加载数据流
  13. var recordInputDstream: InputDStream[ConsumerRecord[String, String]] = null
  14. if (offsetMapForKafka != null && offsetMapForKafka.size > 0) {
  15. recordInputDstream = MyKafkaUtilDwdTbStu.getInputDStreamByMapTopicPartition(topic, offsetMapForKafka, ssc)
  16. } else {
  17. recordInputDstream = MyKafkaUtilDwdTbStu.getInputDStreamByDefault(topic, ssc)
  18. }
  19. //从流中获得本批次的 偏移量结束点
  20. var offsetRanges: Array[OffsetRange] = null
  21. val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] =
  22. recordInputDstream.transform {
  23. rdd =>{
  24. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  25. rdd
  26. }
  27. }
  28. //提取数据
  29. val stuWideDstream: DStream[StuWide] = inputGetOffsetDstream.map {
  30. record =>{
  31. val jsonString: String = record.value()
  32. //得到最后的数据
  33. val stuWide: StuWide = JSON.parseObject(jsonString,
  34. classOf[StuWide])
  35. stuWide
  36. }
  37. }
  38. // 聚合
  39. val res: DStream[(String, Int)] = stuWideDstream.map(item => {
  40. (item.aearName, item.score.toInt)
  41. }).reduceByKey(_ + _)
  42. //存储数据以及偏移量到 MySQL 中,为了保证精准消费 我们将使用事务对存储数据和修改偏移量
  43. res.foreachRDD {
  44. rdd =>{
  45. val aearSum: Array[(String, Int)] = rdd.collect()
  46. if (aearSum != null && aearSum.size > 0) {
  47. DBs.setup()
  48. DB.localTx {
  49. implicit session =>{
  50. // 写入计算结果数据
  51. val batchParamsList: ListBuffer[Seq[Any]] = ListBuffer[Seq[Any]]()
  52. for ((aearName, sumScore) <- aearSum) {
  53. //下面是保存数据去哪个地方
  54. batchParamsList.append(Seq(aearName, sumScore))
  55. }
  56. SQL( "insert into aear_sumscore(aear_name,sum_score) values(?,?)")
  57. .batch(batchParamsList.toSeq:_*).apply()
  58. //throw new RuntimeException("测试异常")
  59. // 写入偏移量
  60. for (offsetRange <- offsetRanges) {
  61. val partitionId: Int = offsetRange.partition
  62. val untilOffset: Long = offsetRange.untilOffset
  63. SQL ("replace into offset values(?,?,?,?)").bind(groupId,
  64. topic, partitionId, untilOffset).update().apply()
  65. }
  66. }
  67. }
  68. }
  69. }
  70. }
  71. ssc.start()
  72. ssc.awaitTermination()
  73. }
  74. }


转载:https://blog.csdn.net/S1124654/article/details/125776790
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场