小言_互联网的博客

【从0开始の全记录】Crontab+Flume+Kafka+Spark Streaming+Spring Boot 统计网页访问量项目

412人阅读  评论(0)

1.需求说明

1.1 需求

到现在为止的网页访问量

到现在为止从搜索引擎引流过来的网页访问量

项目总体框架如图所示:

1.2 用户行为日志内容

2.模拟日志数据制作

用Python制作模拟数据,数据包含:

  • 不同的URL地址->url_paths
  • 不同的跳转链接地址->http_refers
  • 不同的搜索关键词->search_keyword
  • 不同的状态码->status_codes
  • 不同的IP地址->ip_slices

  
  1. #coding=UTF-8
  2. import random
  3. import time
  4. url_paths = [
  5. "class/112.html",
  6. "class/128.html",
  7. "class/145.html",
  8. "class/146.html",
  9. "class/131.html",
  10. "class/130.html",
  11. "class/145.html",
  12. "learn/821.html",
  13. "learn/825.html",
  14. "course/list"
  15. ]
  16. http_refers=[
  17. "http://www.baidu.com/s?wd={query}",
  18. "https://www.sogou.com/web?query={query}",
  19. "http://cn.bing.com/search?q={query}",
  20. "http://search.yahoo.com/search?p={query}",
  21. ]
  22. search_keyword = [
  23. "Spark+Sql",
  24. "Hadoop",
  25. "Storm",
  26. "Spark+Streaming",
  27. "大数据",
  28. "面试"
  29. ]
  30. status_codes = [ "200", "404", "500"]
  31. ip_slices = [ 132, 156, 132, 10, 29, 145, 44, 30, 21, 43, 1, 7, 9, 23, 55, 56, 241, 134, 155, 163, 172, 144, 158]
  32. def sample_url():
  33. return random.sample(url_paths, 1)[ 0]
  34. def sample_ip():
  35. slice = random.sample(ip_slices, 4)
  36. return ".".join([str(item) for item in slice])
  37. def sample_refer():
  38. if random.uniform( 0, 1) > 0.2:
  39. return "-"
  40. refer_str = random.sample(http_refers, 1)
  41. query_str = random.sample(search_keyword, 1)
  42. return refer_str[ 0].format(query=query_str[ 0])
  43. def sample_status():
  44. return random.sample(status_codes, 1)[ 0]
  45. def generate_log(count = 10):
  46. time_str = time.strftime( "%Y-%m-%d %H:%M:%S",time.localtime())
  47. f = open( "/home/hadoop/tpdata/project/logs/access.log", "w+")
  48. while count >= 1:
  49. query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status}\t{refer}".format(
  50. local_time=time_str,
  51. url=sample_url(),
  52. ip=sample_ip(),
  53. refer=sample_refer(),
  54. status=sample_status())
  55. print(query_log)
  56. f.write(query_log + "\n")
  57. count = count - 1
  58. if __name__ == '__main__':
  59. generate_log( 100)

使用Linux Crontab定时调度工具,使其每一分钟产生一批数据。

表达式:

*/1 * * * *

编写python运行脚本:


  
  1. vi log_generator.sh
  2. python /home/hadoop/tpdata/ log.py
  3. chmod u+x log_generator.sh

配置Crontab: 


  
  1. crontab -e
  2. * /1 * * * * /home /hadoop/tpdata /project/log_generator.sh

2.Flume实时收集日志信息

开发时选型:

编写streaming_project.conf:

vi streaming_project.conf

  
  1. exec-memory-logger.sources = exec-source
  2. exec-memory-logger.sinks = logger-sink
  3. exec-memory-logger.channels = memory-channel
  4. exec-memory-logger.sources. exec-source.type = exec
  5. exec-memory-logger.sources. exec-source.command = tail -F /home/hadoop/tpdata/project/logs/access.log
  6. exec-memory-logger.sources. exec-source.shell = /bin/sh -c
  7. exec-memory-logger.channels.memory-channel.type = memory
  8. exec-memory-logger.sinks.logger-sink.type = logger
  9. exec-memory-logger.sources. exec-source.channels = memory-channel
  10. exec-memory-logger.sinks.logger-sink.channel = memory-channel

启动Flume测试:


  
  1. flume-ng agent \
  2. --name exec-memory-logger \
  3. --conf $FLUME_HOME/conf \
  4. --conf-file /home/hadoop/tpdata/project/streaming_project.conf \
  5. -Dflume.root.logger=INFO,console

3.对接实时日志数据到Kafka

启动Zookeeper:

./zkServer.sh start

启动Kafka Server:

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

其中server.properties:


  
  1. broker.id= 0
  2. ############################# Socket Server Settings #############################
  3. listeners=PLAINTEXT: //:9092
  4. host.name=hadoop000
  5. advertised.host.name= 192.168 .1 .9
  6. advertised.port= 9092
  7. num.network.threads= 3
  8. num.io.threads= 8
  9. socket.send.buffer.bytes= 102400
  10. socket.receive.buffer.bytes= 102400
  11. socket.request.max.bytes= 104857600
  12. ############################# Log Basics #############################
  13. log.dirs=/home/hadoop/app/tmp/kafka-logs
  14. num.partitions= 1
  15. num.recovery.threads.per.data.dir= 1
  16. ############################# Log Retention Policy #############################
  17. log.retention.hours= 168
  18. log.segment.bytes= 1073741824
  19. log.retention.check.interval.ms= 300000
  20. log.cleaner.enable= false
  21. ############################# Zookeeper #############################
  22. zookeeper.connect=hadoop000: 2181
  23. zookeeper.connection.timeout.ms= 6000

启动一个Kafka的消费者(topic用的之前的,没有的话可以新建一个):

kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic streamingtopic

修改Flume配置文件,使得Flume的sink链接到Kafka:

vi streaming_project2.conf

  
  1. exec-memory-kafka.sources = exec-source
  2. exec-memory-kafka.sinks = kafka-sink
  3. exec-memory-kafka.channels = memory-channel
  4. exec-memory-kafka.sources. exec-source.type = exec
  5. exec-memory-kafka.sources. exec-source.command = tail -F /home/hadoop/tpdata/project/logs/access.log
  6. exec-memory-kafka.sources. exec-source.shell = /bin/sh -c
  7. exec-memory-kafka.channels.memory-channel.type = memory
  8. exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
  9. exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000: 9092
  10. exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
  11. exec-memory-kafka.sinks.kafka-sink.batchSize = 5
  12. exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
  13. exec-memory-kafka.sources. exec-source.channels = memory-channel
  14. exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动Flume:


  
  1. flume-ng agent \
  2. --name exec-memory-kafka \
  3. --conf $FLUME_HOME/conf \
  4. --conf-file /home/hadoop/tpdata/project/streaming_project2.conf \
  5. -Dflume.root.logger=INFO,console

kafka消费者拿到数据:

4.Spark Streaming对接Kafka对数据消费

4.1 pom.xml:


  
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  2. <modelVersion>4.0.0 </modelVersion>
  3. <groupId>com.taipark.spark </groupId>
  4. <artifactId>sparktrain </artifactId>
  5. <version>1.0 </version>
  6. <inceptionYear>2008 </inceptionYear>
  7. <properties>
  8. <scala.version>2.11.8 </scala.version>
  9. <kafka.version>0.9.0.0 </kafka.version>
  10. <spark.version>2.2.0 </spark.version>
  11. <hadoop.version>2.6.0-cdh5.7.0 </hadoop.version>
  12. <hbase.version>1.2.0-cdh5.7.0 </hbase.version>
  13. </properties>
  14. <repositories>
  15. <repository>
  16. <id>cloudera </id>
  17. <url>https://repository.cloudera.com/artifactory/cloudera-repos </url>
  18. </repository>
  19. </repositories>
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.scala-lang </groupId>
  23. <artifactId>scala-library </artifactId>
  24. <version>${scala.version} </version>
  25. </dependency>
  26. <!--
  27. <dependency>
  28. <groupId>org.apache.kafka</groupId>
  29. <artifactId>kafka_2.11</artifactId>
  30. <version>${kafka.version}</version>
  31. </dependency>
  32. -->
  33. <dependency>
  34. <groupId>org.apache.hadoop </groupId>
  35. <artifactId>hadoop-client </artifactId>
  36. <version>${hadoop.version} </version>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.apache.hbase </groupId>
  40. <artifactId>hbase-client </artifactId>
  41. <version>${hbase.version} </version>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.apache.hbase </groupId>
  45. <artifactId>hbase-server </artifactId>
  46. <version>${hbase.version} </version>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.apache.spark </groupId>
  50. <artifactId>spark-streaming_2.11 </artifactId>
  51. <version>${spark.version} </version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.spark </groupId>
  55. <artifactId>spark-streaming-kafka-0-8_2.11 </artifactId>
  56. <version>2.2.0 </version>
  57. </dependency>
  58. <!--SS整合Flume依赖-->
  59. <dependency>
  60. <groupId>org.apache.spark </groupId>
  61. <artifactId>spark-streaming-flume_2.11 </artifactId>
  62. <version>${spark.version} </version>
  63. </dependency>
  64. <!--SS整合sink依赖-->
  65. <dependency>
  66. <groupId>org.apache.spark </groupId>
  67. <artifactId>spark-streaming-flume-sink_2.11 </artifactId>
  68. <version>${spark.version} </version>
  69. </dependency>
  70. <dependency>
  71. <groupId>org.apache.commons </groupId>
  72. <artifactId>commons-lang3 </artifactId>
  73. <version>3.5 </version>
  74. </dependency>
  75. <dependency>
  76. <groupId>org.apache.spark </groupId>
  77. <artifactId>spark-sql_2.11 </artifactId>
  78. <version>${spark.version} </version>
  79. </dependency>
  80. <dependency>
  81. <groupId>mysql </groupId>
  82. <artifactId>mysql-connector-java </artifactId>
  83. <version>8.0.13 </version>
  84. </dependency>
  85. <dependency>
  86. <groupId>com.fasterxml.jackson.module </groupId>
  87. <artifactId>jackson-module-scala_2.11 </artifactId>
  88. <version>2.6.5 </version>
  89. </dependency>
  90. <dependency>
  91. <groupId>net.jpountz.lz4 </groupId>
  92. <artifactId>lz4 </artifactId>
  93. <version>1.3.0 </version>
  94. </dependency>
  95. <dependency>
  96. <groupId>org.apache.flume.flume-ng-clients </groupId>
  97. <artifactId>flume-ng-log4jappender </artifactId>
  98. <version>1.6.0 </version>
  99. </dependency>
  100. </dependencies>
  101. <build>
  102. <sourceDirectory>src/main/scala </sourceDirectory>
  103. <testSourceDirectory>src/test/scala </testSourceDirectory>
  104. <plugins>
  105. <plugin>
  106. <groupId>org.scala-tools </groupId>
  107. <artifactId>maven-scala-plugin </artifactId>
  108. <executions>
  109. <execution>
  110. <goals>
  111. <goal>compile </goal>
  112. <goal>testCompile </goal>
  113. </goals>
  114. </execution>
  115. </executions>
  116. <configuration>
  117. <scalaVersion>${scala.version} </scalaVersion>
  118. <args>
  119. <arg>-target:jvm-1.5 </arg>
  120. </args>
  121. </configuration>
  122. </plugin>
  123. <plugin>
  124. <groupId>org.apache.maven.plugins </groupId>
  125. <artifactId>maven-eclipse-plugin </artifactId>
  126. <configuration>
  127. <downloadSources>true </downloadSources>
  128. <buildcommands>
  129. <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder </buildcommand>
  130. </buildcommands>
  131. <additionalProjectnatures>
  132. <projectnature>ch.epfl.lamp.sdt.core.scalanature </projectnature>
  133. </additionalProjectnatures>
  134. <classpathContainers>
  135. <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER </classpathContainer>
  136. <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER </classpathContainer>
  137. </classpathContainers>
  138. </configuration>
  139. </plugin>
  140. </plugins>
  141. </build>
  142. <reporting>
  143. <plugins>
  144. <plugin>
  145. <groupId>org.scala-tools </groupId>
  146. <artifactId>maven-scala-plugin </artifactId>
  147. <configuration>
  148. <scalaVersion>${scala.version} </scalaVersion>
  149. </configuration>
  150. </plugin>
  151. </plugins>
  152. </reporting>
  153. </project>

4.2 连通Kafka

新建Scala文件——WebStatStreamingApp.scala,首先使用Direct模式连通Kafka:


  
  1. package com.taipark.spark.project
  2. import kafka.serializer.StringDecoder
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.streaming.kafka.KafkaUtils
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. /**
  7. * 使用Spark Streaming消费Kafka的数据
  8. */
  9. object WebStatStreamingApp {
  10. def main(args: Array[String]): Unit = {
  11. if(args.length != 2){
  12. System.err.println( "Userage:WebStatStreamingApp <brokers> <topics>");
  13. System.exit( 1);
  14. }
  15. val Array(brokers,topics) = args
  16. val sparkConf = new SparkConf()
  17. .setAppName( "WebStatStreamingApp")
  18. .setMaster( "local[2]")
  19. val ssc = new StreamingContext(sparkConf,Seconds( 60))
  20. val kafkaParams = Map[String,String]( "metadata.broker.list"-> brokers)
  21. val topicSet = topics.split( ",").toSet
  22. val messages = KafkaUtils
  23. .createDirectStream[String,String,StringDecoder,StringDecoder](
  24. ssc,kafkaParams,topicSet
  25. )
  26. messages.map(_._2).count().print()
  27. ssc.start()
  28. ssc.awaitTermination()
  29. }
  30. }

设定参数:

hadoop000:9092 streamingtopic

在本地测试是否连通:

连通成功,可以开始编写业务代码完成数据清洗(ETL)。

4.3 ETL

新建工具类DateUtils.scala:


  
  1. package com.taipark.spark.project.utils
  2. import java.util.Date
  3. import org.apache.commons.lang3.time.FastDateFormat
  4. /**
  5. * 日期时间工具类
  6. */
  7. object DateUtils {
  8. val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance( "yyyy-MM-dd HH:mm:ss")
  9. val TARGET_FORMAT = FastDateFormat.getInstance( "yyyyMMddHHmmss")
  10. def getTime(time:String)={
  11. YYYYMMDDHHMMSS_FORMAT.parse(time).getTime
  12. }
  13. def parseToMinute(time:String)={
  14. TARGET_FORMAT.format( new Date(getTime(time)))
  15. }
  16. def main(args: Array[String]): Unit = {
  17. // println(parseToMinute("2020-03-10 15:00:05"))
  18. }
  19. }

新建ClickLog.scala:


  
  1. package com .taipark .spark .project .domian
  2. /**
  3. * 清洗后的日志信息
  4. */
  5. case class ClickLog( ip :String, time :String, courseId :Int, statusCode :Int, referer :String)

修改WebStatStreamingApp.scala:


  
  1. package com.taipark.spark.project.spark
  2. import com.taipark.spark.project.domian.ClickLog
  3. import com.taipark.spark.project.utils.DateUtils
  4. import kafka.serializer.StringDecoder
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.streaming.kafka.KafkaUtils
  7. import org.apache.spark.streaming.{Seconds, StreamingContext}
  8. /**
  9. * 使用Spark Streaming消费Kafka的数据
  10. */
  11. object WebStatStreamingApp {
  12. def main(args: Array[String]): Unit = {
  13. if(args.length != 2){
  14. System.err.println( "Userage:WebStatStreamingApp <brokers> <topics>");
  15. System.exit( 1);
  16. }
  17. val Array(brokers,topics) = args
  18. val sparkConf = new SparkConf()
  19. .setAppName( "WebStatStreamingApp")
  20. .setMaster( "local[2]")
  21. val ssc = new StreamingContext(sparkConf,Seconds( 60))
  22. val kafkaParams = Map[String,String]( "metadata.broker.list"-> brokers)
  23. val topicSet = topics.split( ",").toSet
  24. val messages = KafkaUtils
  25. .createDirectStream[String,String,StringDecoder,StringDecoder](
  26. ssc,kafkaParams,topicSet
  27. )
  28. //messages.map(_._2).count().print()
  29. //ETL
  30. // 30.163.55.7 2020-03-10 14:32:01 "GET /class/112.html HTTP/1.1" 404 http://www.baidu.com/s?wd=Hadoop
  31. val logs = messages.map(_._2)
  32. val cleanData = logs.map(line => {
  33. val infos = line.split( "\t")
  34. //infos(2) = "GET /class/112.html HTTP/1.1"
  35. val url = infos( 2).split( " ")( 1)
  36. var courseId = 0
  37. //拿到课程编号
  38. if(url.startsWith( "/class")){
  39. val courseIdHTML = url.split( "/")( 2)
  40. courseId = courseIdHTML.substring( 0,courseIdHTML.lastIndexOf( ".")).toInt
  41. }
  42. ClickLog(infos( 0),DateUtils.parseToMinute(infos( 1)),courseId,infos( 3).toInt,infos( 4))
  43. }).filter(clicklog => clicklog.courseId != 0)
  44. cleanData.print()
  45. ssc.start()
  46. ssc.awaitTermination()
  47. }
  48. }

run起来测试一下:

ETL完成。

4.4 功能一:到现在为止某网站的访问量

使用数据库来存储统计结果,可视化前端根据yyyyMMdd courseid把数据库里的结果展示出来。

选择HBASE作为数据库。要启动HDFS与Zookeeper。

启动HDFS:

./start-dfs.sh

启动HBASE:

./start-hbase.sh

  
  1. ./hbase shell
  2. list

HBASE表设计:

create 'web_course_clickcount','info'

  
  1. hbase(main): 008: 0> desc 'web_course_clickcount'
  2. Table web_course_clickcount is ENABLED
  3. web_course_clickcount
  4. COLUMN FAMILIES DESCRIPTION
  5. {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FA
  6. LSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOC
  7. KCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
  8. 1 row(s) in 0.1650 seconds

Rowkey设计:

day_courseid

使用Scala来操作HBASE:

新建网页点击数实体类 CourseClickCount.scala:


  
  1. package com.taipark.spark.project.domian
  2. /**
  3. * 课程网页点击数
  4. * @param day_course HBASE中的rowkey
  5. * @param click_count 对应的点击总数
  6. */
  7. case class CourseClickCount(day_course:String,click_count:Long)

新建数据访问层 CourseClickCountDAO.scala:


  
  1. package com.taipark.spark.project.dao
  2. import com.taipark.spark.project.domian.CourseClickCount
  3. import scala.collection.mutable.ListBuffer
  4. object CourseClickCountDAO {
  5. val tableName = "web_course_clickcount"
  6. val cf = "info"
  7. val qualifer = "click_count"
  8. /**
  9. * 保存数据到HBASE
  10. * @param list
  11. */
  12. def save(list:ListBuffer[CourseClickCount]): Unit ={
  13. }
  14. /**
  15. * 根据rowkey查询值
  16. * @param day_course
  17. * @return
  18. */
  19. def count(day_course:String):Long={
  20. 0l
  21. }
  22. }

利用Java实现HBaseUtils打通其与HBASE:


  
  1. package com.taipark.spark.project.utils;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.client.HBaseAdmin;
  4. import org.apache.hadoop.hbase.client.HTable;
  5. import org.apache.hadoop.hbase.client.Put;
  6. import org.apache.hadoop.hbase.util.Bytes;
  7. import java.io.IOException;
  8. /**
  9. * HBase操作工具类:Java工具类采用单例模式封装
  10. */
  11. public class HBaseUtils {
  12. HBaseAdmin admin = null;
  13. Configuration configuration = null;
  14. //私有构造方法(单例模式)
  15. private HBaseUtils(){
  16. configuration = new Configuration();
  17. configuration.set( "hbase.zookeeper.quorum",
  18. "hadoop000:2181");
  19. configuration.set( "hbase.rootdir",
  20. "hdfs://hadoop000:8020/hbase");
  21. try {
  22. admin = new HBaseAdmin(configuration);
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. private static HBaseUtils instance = null;
  28. public static synchronized HBaseUtils getInstance(){
  29. if(instance == null){
  30. instance = new HBaseUtils();
  31. }
  32. return instance;
  33. }
  34. //根据表名获取HTable实例
  35. public HTable getTable(String tableName){
  36. HTable table = null;
  37. try {
  38. table = new HTable(configuration,tableName);
  39. } catch (IOException e) {
  40. e.printStackTrace();
  41. }
  42. return table;
  43. }
  44. /**
  45. * 添加一条记录到HBASE表
  46. * @param tableName 表名
  47. * @param rowkey 表rowkey
  48. * @param cf 表的columnfamily
  49. * @param column 表的列
  50. * @param value 写入HBASE的值
  51. */
  52. public void put(String tableName,String rowkey,String cf,String column,String value){
  53. HTable table = getTable(tableName);
  54. Put put = new Put(Bytes.toBytes(rowkey));
  55. put.add(Bytes.toBytes(cf),Bytes.toBytes(column),Bytes.toBytes(value));
  56. try {
  57. table.put(put);
  58. } catch (IOException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. public static void main(String[] args) {
  63. // HTable hTable = HBaseUtils.getInstance().getTable("web_course_clickcount");
  64. // System.out.println(hTable.getName().getNameAsString());
  65. String tableName = "web_course_clickcount";
  66. String rowkey = "20200310_88";
  67. String cf = "info";
  68. String column = "click_count";
  69. String value = "2";
  70. HBaseUtils.getInstance().put(tableName,rowkey,cf,column,value);
  71. }
  72. }

测试运行:

测试工具类成功后继续编写DAO的代码:


  
  1. package com.taipark.spark.project.dao
  2. import com.taipark.spark.project.domian.CourseClickCount
  3. import com.taipark.spark.project.utils.HBaseUtils
  4. import org.apache.hadoop.hbase.client.Get
  5. import org.apache.hadoop.hbase.util.Bytes
  6. import scala.collection.mutable.ListBuffer
  7. object CourseClickCountDAO {
  8. val tableName = "web_course_clickcount"
  9. val cf = "info"
  10. val qualifer = "click_count"
  11. /**
  12. * 保存数据到HBASE
  13. * @param list
  14. */
  15. def save(list:ListBuffer[CourseClickCount]): Unit ={
  16. val table = HBaseUtils.getInstance().getTable(tableName)
  17. for(ele <- list){
  18. table.incrementColumnValue(
  19. Bytes.toBytes(ele.day_course),
  20. Bytes.toBytes(cf),
  21. Bytes.toBytes(qualifer),
  22. ele.click_count)
  23. }
  24. }
  25. /**
  26. * 根据rowkey查询值
  27. * @param day_course
  28. * @return
  29. */
  30. def count(day_course:String):Long={
  31. val table = HBaseUtils.getInstance().getTable(tableName)
  32. val get = new Get(Bytes.toBytes(day_course))
  33. val value = table.get(get).getValue(cf.getBytes,qualifer.getBytes)
  34. if (value == null){
  35. 0L
  36. } else{
  37. Bytes.toLong(value)
  38. }
  39. }
  40. def main(args: Array[String]): Unit = {
  41. val list = new ListBuffer[CourseClickCount]
  42. list.append(CourseClickCount( "2020311_8", 8))
  43. list.append(CourseClickCount( "2020311_9", 9))
  44. list.append(CourseClickCount( "2020311_10", 1))
  45. list.append(CourseClickCount( "2020311_2", 15))
  46. save(list)
  47. }
  48. }

测试运行一下,用hbase shell查看:

scan 'web_course_clickcount'

将Spark Streaming处理结果写到HBASE中:


  
  1. package com.taipark.spark.project.spark
  2. import com.taipark.spark.project.dao.CourseClickCountDAO
  3. import com.taipark.spark.project.domian.{ClickLog, CourseClickCount}
  4. import com.taipark.spark.project.utils.DateUtils
  5. import kafka.serializer.StringDecoder
  6. import org.apache.spark.SparkConf
  7. import org.apache.spark.streaming.kafka.KafkaUtils
  8. import org.apache.spark.streaming.{Seconds, StreamingContext}
  9. import scala.collection.mutable.ListBuffer
  10. /**
  11. * 使用Spark Streaming消费Kafka的数据
  12. */
  13. object WebStatStreamingApp {
  14. def main(args: Array[String]): Unit = {
  15. if(args.length != 2){
  16. System.err.println( "Userage:WebStatStreamingApp <brokers> <topics>");
  17. System.exit( 1);
  18. }
  19. val Array(brokers,topics) = args
  20. val sparkConf = new SparkConf()
  21. .setAppName( "WebStatStreamingApp")
  22. .setMaster( "local[2]")
  23. val ssc = new StreamingContext(sparkConf,Seconds( 60))
  24. val kafkaParams = Map[String,String]( "metadata.broker.list"-> brokers)
  25. val topicSet = topics.split( ",").toSet
  26. val messages = KafkaUtils
  27. .createDirectStream[String,String,StringDecoder,StringDecoder](
  28. ssc,kafkaParams,topicSet
  29. )
  30. //messages.map(_._2).count().print()
  31. //ETL
  32. // 30.163.55.7 2020-03-10 14:32:01 "GET /class/112.html HTTP/1.1" 404 http://www.baidu.com/s?wd=Hadoop
  33. val logs = messages.map(_._2)
  34. val cleanData = logs.map(line => {
  35. val infos = line.split( "\t")
  36. //infos(2) = "GET /class/112.html HTTP/1.1"
  37. val url = infos( 2).split( " ")( 1)
  38. var courseId = 0
  39. //拿到课程编号
  40. if(url.startsWith( "/class")){
  41. val courseIdHTML = url.split( "/")( 2)
  42. courseId = courseIdHTML.substring( 0,courseIdHTML.lastIndexOf( ".")).toInt
  43. }
  44. ClickLog(infos( 0),DateUtils.parseToMinute(infos( 1)),courseId,infos( 3).toInt,infos( 4))
  45. }).filter(clicklog => clicklog.courseId != 0)
  46. // cleanData.print()
  47. cleanData.map(x => {
  48. //HBase rowkey设计:20200311_9
  49. ((x.time.substring( 0, 8)) + "_" + x.courseId, 1)
  50. }).reduceByKey(_+_).foreachRDD(rdd =>{
  51. rdd.foreachPartition(partitionRecords =>{
  52. val list = new ListBuffer[CourseClickCount]
  53. partitionRecords.foreach(pair =>{
  54. list.append(CourseClickCount(pair._1,pair._2))
  55. })
  56. CourseClickCountDAO.save(list)
  57. })
  58. })
  59. ssc.start()
  60. ssc.awaitTermination()
  61. }
  62. }

测试:

4.5 功能二:到现在为止某网站的搜索引擎引流访问量

HBASE表设计:

create 'web_course_search_clickcount','info'

设计rowkey:

day_search_1

确定实体类:


  
  1. package com.taipark.spark.project.domian
  2. /**
  3. * 网站从搜索引擎过来的点击数实体类
  4. * @param day_search_course
  5. * @param click_count
  6. */
  7. case class CourseSearchClickCount (day_search_course:String,click_count:Long)

开发DAO CourseSearchClickCountDAO.scala:


  
  1. package com.taipark.spark.project.dao
  2. import com.taipark.spark.project.domian.{CourseClickCount, CourseSearchClickCount}
  3. import com.taipark.spark.project.utils.HBaseUtils
  4. import org.apache.hadoop.hbase.client.Get
  5. import org.apache.hadoop.hbase.util.Bytes
  6. import scala.collection.mutable.ListBuffer
  7. object CourseSearchClickCountDAO {
  8. val tableName = "web_course_search_clickcount"
  9. val cf = "info"
  10. val qualifer = "click_count"
  11. /**
  12. * 保存数据到HBASE
  13. * @param list
  14. */
  15. def save(list:ListBuffer[CourseSearchClickCount]): Unit ={
  16. val table = HBaseUtils.getInstance().getTable(tableName)
  17. for(ele <- list){
  18. table.incrementColumnValue(
  19. Bytes.toBytes(ele.day_search_course),
  20. Bytes.toBytes(cf),
  21. Bytes.toBytes(qualifer),
  22. ele.click_count)
  23. }
  24. }
  25. /**
  26. * 根据rowkey查询值
  27. * @param day_search_course
  28. * @return
  29. */
  30. def count(day_search_course:String):Long={
  31. val table = HBaseUtils.getInstance().getTable(tableName)
  32. val get = new Get(Bytes.toBytes(day_search_course))
  33. val value = table.get(get).getValue(cf.getBytes,qualifer.getBytes)
  34. if (value == null){
  35. 0L
  36. } else{
  37. Bytes.toLong(value)
  38. }
  39. }
  40. def main(args: Array[String]): Unit = {
  41. val list = new ListBuffer[CourseSearchClickCount]
  42. list.append(CourseSearchClickCount( "2020311_www.baidu.com_8", 8))
  43. list.append(CourseSearchClickCount( "2020311_cn.bing.com_9", 9))
  44. save(list)
  45. println(count( "020311_www.baidu.com_8"))
  46. }
  47. }

测试:

在Spark Streaming中写到HBASE:


  
  1. package com.taipark.spark.project.spark
  2. import com.taipark.spark.project.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
  3. import com.taipark.spark.project.domian.{ClickLog, CourseClickCount, CourseSearchClickCount}
  4. import com.taipark.spark.project.utils.DateUtils
  5. import kafka.serializer.StringDecoder
  6. import org.apache.spark.SparkConf
  7. import org.apache.spark.streaming.kafka.KafkaUtils
  8. import org.apache.spark.streaming.{Seconds, StreamingContext}
  9. import scala.collection.mutable.ListBuffer
  10. /**
  11. * 使用Spark Streaming消费Kafka的数据
  12. */
  13. object WebStatStreamingApp {
  14. def main(args: Array[String]): Unit = {
  15. if(args.length != 2){
  16. System.err.println( "Userage:WebStatStreamingApp <brokers> <topics>");
  17. System.exit( 1);
  18. }
  19. val Array(brokers,topics) = args
  20. val sparkConf = new SparkConf()
  21. .setAppName( "WebStatStreamingApp")
  22. .setMaster( "local[2]")
  23. val ssc = new StreamingContext(sparkConf,Seconds( 60))
  24. val kafkaParams = Map[String,String]( "metadata.broker.list"-> brokers)
  25. val topicSet = topics.split( ",").toSet
  26. val messages = KafkaUtils
  27. .createDirectStream[String,String,StringDecoder,StringDecoder](
  28. ssc,kafkaParams,topicSet
  29. )
  30. //messages.map(_._2).count().print()
  31. //ETL
  32. // 30.163.55.7 2020-03-10 14:32:01 "GET /class/112.html HTTP/1.1" 404 http://www.baidu.com/s?wd=Hadoop
  33. val logs = messages.map(_._2)
  34. val cleanData = logs.map(line => {
  35. val infos = line.split( "\t")
  36. //infos(2) = "GET /class/112.html HTTP/1.1"
  37. val url = infos( 2).split( " ")( 1)
  38. var courseId = 0
  39. //拿到课程编号
  40. if(url.startsWith( "/class")){
  41. val courseIdHTML = url.split( "/")( 2)
  42. courseId = courseIdHTML.substring( 0,courseIdHTML.lastIndexOf( ".")).toInt
  43. }
  44. ClickLog(infos( 0),DateUtils.parseToMinute(infos( 1)),courseId,infos( 3).toInt,infos( 4))
  45. }).filter(clicklog => clicklog.courseId != 0)
  46. // cleanData.print()
  47. //需求一
  48. cleanData.map(x => {
  49. //HBase rowkey设计:20200311_9
  50. ((x.time.substring( 0, 8)) + "_" + x.courseId, 1)
  51. }).reduceByKey(_+_).foreachRDD(rdd =>{
  52. rdd.foreachPartition(partitionRecords =>{
  53. val list = new ListBuffer[CourseClickCount]
  54. partitionRecords.foreach(pair =>{
  55. list.append(CourseClickCount(pair._1,pair._2))
  56. })
  57. CourseClickCountDAO.save(list)
  58. })
  59. })
  60. //需求二
  61. cleanData.map(x =>{
  62. //http://www.baidu.com/s?wd=Spark+Streaming
  63. val referer = x.referer.replaceAll( "//", "/")
  64. //http:/www.baidu.com/s?wd=Spark+Streaming
  65. val splits = referer.split( "/")
  66. var host = ""
  67. //splits.length == 1 => -
  68. if(splits.length > 2){
  69. host = splits( 1)
  70. }
  71. (host,x.courseId,x.time)
  72. }).filter(_._1 != "").map(x =>{
  73. (x._3.substring( 0, 8) + "_" + x._1 + "_" + x._2, 1)
  74. }).reduceByKey(_+_).foreachRDD(rdd =>{
  75. rdd.foreachPartition(partitionRecords =>{
  76. val list = new ListBuffer[CourseSearchClickCount]
  77. partitionRecords.foreach(pair =>{
  78. list.append(CourseSearchClickCount(pair._1,pair._2))
  79. })
  80. CourseSearchClickCountDAO.save(list)
  81. })
  82. })
  83. ssc.start()
  84. ssc.awaitTermination()
  85. }
  86. }

测试:

5.生产环境部署

不要硬编码,把setAppName和setMaster注释掉:


  
  1. val sparkConf = new SparkConf()
  2. // .setAppName("WebStatStreamingApp")
  3. // .setMaster("local[2]")

Maven打包部署前,需要将pom中指定build目录的两行注释掉,以防报错:


  
  1. <!--
  2. <sourceDirectory>src/main/scala</sourceDirectory>
  3. <testSourceDirectory>src/test/scala</testSourceDirectory>
  4. -->

Maven打包传到服务器:

利用spark-submit提交:


  
  1. ./spark-submit \
  2. --master local[ 5] \
  3. -- name WebStatStreamingApp \
  4. -- class com.taipark.spark.project.spark.WebStatStreamingApp \
  5. /home/hadoop/tplib/sparktrain- 1.0.jar \
  6. hadoop000: 9092 streamingtopic

报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$

 修改,添加jar包spark-streaming-kafka-0-8_2.11:


  
  1. ./spark-submit \
  2. --master local[5] \
  3. --name WebStatStreamingApp \
  4. --class com.taipark.spark.project.spark.WebStatStreamingApp \
  5. --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
  6. /home/hadoop/tplib/sparktrain-1.0.jar \
  7. hadoop000:9092 streamingtopic

报错:

java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/HBaseAdmin

修改,增加HBASE的jar包:


  
  1. ./spark-submit \
  2. --master local[5] \
  3. --name WebStatStreamingApp \
  4. --class com.taipark.spark.project.spark.WebStatStreamingApp \
  5. --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
  6. --jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \
  7. /home/hadoop/tplib/sparktrain-1.0.jar \
  8. hadoop000:9092 streamingtopic

运行:

后台运行成功

6.Spring Boot开发

6.1 测试ECharts

新建一个Spring Boot项目,下载ECharts,利用其在线编译,获得echarts.min.js,放在resources/static/js下

pox.xml添加一个依赖:


  
  1. <dependency>
  2. <groupId>org.springframework.boot </groupId>
  3. <artifactId>spring-boot-starter-thymeleaf </artifactId>
  4. </dependency>

resources/templates里做一个test.html:


  
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>test </title>
  6. <!-- 引入 ECharts 文件 -->
  7. <script src="js/echarts.min.js"> </script>
  8. </head>
  9. <body>
  10. <!-- 为 ECharts 准备一个具备大小(宽高)的 DOM -->
  11. <div id="main" style="width: 600px;height:400px;"> </div>
  12. <script type="text/javascript">
  13. // 基于准备好的dom,初始化echarts实例
  14. var myChart = echarts.init( document.getElementById( 'main'));
  15. // 指定图表的配置项和数据
  16. var option = {
  17. title: {
  18. text: 'ECharts 入门示例'
  19. },
  20. tooltip: {},
  21. legend: {
  22. data:[ '销量']
  23. },
  24. xAxis: {
  25. data: [ "衬衫", "羊毛衫", "雪纺衫", "裤子", "高跟鞋", "袜子"]
  26. },
  27. yAxis: {},
  28. series: [{
  29. name: '销量',
  30. type: 'bar',
  31. data: [ 5, 20, 36, 10, 10, 20]
  32. }]
  33. };
  34. // 使用刚指定的配置项和数据显示图表。
  35. myChart.setOption(option);
  36. </script>
  37. </body>
  38. </html>

新建java文件:


  
  1. package com.taipark.spark.web;
  2. import org.springframework.web.bind.annotation.RequestMapping;
  3. import org.springframework.web.bind.annotation.RequestMethod;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import org.springframework.web.servlet.ModelAndView;
  6. /**
  7. * 测试
  8. */
  9. @RestController
  10. public class HelloBoot {
  11. @RequestMapping(value = "/hello",method = RequestMethod.GET)
  12. public String sayHello(){
  13. return "HelloWorld!";
  14. }
  15. @RequestMapping(value = "/first",method = RequestMethod.GET)
  16. public ModelAndView firstDemo(){
  17. return new ModelAndView( "test");
  18. }
  19. }

测试一下:

成功

6.2 动态实现ECharts

添加repository:


  
  1. <repositories>
  2. <repository>
  3. <id>cloudera </id>
  4. <url>https://repository.cloudera.com/artifactory/cloudera-repos/ </url>
  5. </repository>
  6. </repositories>

添加依赖:


  
  1. <dependency>
  2. <groupId>org.apache.hbase </groupId>
  3. <artifactId>hbase-client </artifactId>
  4. <version>1.2.0-cdh5.7.0 </version>
  5. </dependency>

创建HBaseUtils.java:


  
  1. package com.taipark.spark.web.utils;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.client.*;
  4. import org.apache.hadoop.hbase.filter.Filter;
  5. import org.apache.hadoop.hbase.filter.PrefixFilter;
  6. import org.apache.hadoop.hbase.util.Bytes;
  7. import java.io.IOException;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. public class HBaseUtils {
  11. HBaseAdmin admin = null;
  12. Configuration configuration = null;
  13. //私有构造方法(单例模式)
  14. private HBaseUtils(){
  15. configuration = new Configuration();
  16. configuration.set( "hbase.zookeeper.quorum",
  17. "hadoop000:2181");
  18. configuration.set( "hbase.rootdir",
  19. "hdfs://hadoop000:8020/hbase");
  20. try {
  21. admin = new HBaseAdmin(configuration);
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. private static HBaseUtils instance = null;
  27. public static synchronized HBaseUtils getInstance(){
  28. if(instance == null){
  29. instance = new HBaseUtils();
  30. }
  31. return instance;
  32. }
  33. //根据表名获取HTable实例
  34. public HTable getTable(String tableName){
  35. HTable table = null;
  36. try {
  37. table = new HTable(configuration,tableName);
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. }
  41. return table;
  42. }
  43. /**
  44. * 根据表名和输入条件获取HBASE的记录数
  45. * @param tableName
  46. * @param dayCourse
  47. * @return
  48. */
  49. public Map<String,Long> query(String tableName,String condition) throws Exception{
  50. Map<String,Long> map = new HashMap<>();
  51. HTable table = getTable(tableName);
  52. String cf = "info";
  53. String qualifier = "click_count";
  54. Scan scan = new Scan();
  55. Filter filter = new PrefixFilter(Bytes.toBytes(condition));
  56. scan.setFilter(filter);
  57. ResultScanner rs = table.getScanner(scan);
  58. for(Result result:rs){
  59. String row = Bytes.toString(result.getRow());
  60. long clickCount = Bytes.toLong(result.getValue(cf.getBytes(), qualifier.getBytes()));
  61. map.put(row,clickCount);
  62. }
  63. return map;
  64. }
  65. public static void main(String[] args) throws Exception{
  66. Map<String, Long> map = HBaseUtils.getInstance().query( "web_course_clickcount", "20200311");
  67. for(Map.Entry<String,Long> entry:map.entrySet()){
  68. System.out.println(entry.getKey() + ":" + entry.getValue());
  69. }
  70. }
  71. }

测试通过:

定义网页访问数量Bean:


  
  1. package com.taipark.spark.web.domain;
  2. import org.springframework.stereotype.Component;
  3. /**
  4. * 网页访问数量实体类
  5. */
  6. @Component
  7. public class CourseClickCount {
  8. private String name;
  9. private long value;
  10. public String getName() {
  11. return name;
  12. }
  13. public void setName(String name) {
  14. this.name = name;
  15. }
  16. public long getValue() {
  17. return value;
  18. }
  19. public void setValue(long value) {
  20. this.value = value;
  21. }
  22. }

DAO层:


  
  1. package com.taipark.spark.web.dao;
  2. import com.taipark.spark.web.domain.CourseClickCount;
  3. import com.taipark.spark.web.utils.HBaseUtils;
  4. import org.springframework.stereotype.Component;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Map;
  8. /**
  9. * 网页访问数量数据访问层
  10. */
  11. @Component
  12. public class CourseClickDAO {
  13. /**
  14. * 根据天查询
  15. * @param day
  16. * @return
  17. * @throws Exception
  18. */
  19. public List<CourseClickCount> query(String day) throws Exception{
  20. List<CourseClickCount> list = new ArrayList<>();
  21. //去HBase表中根据day获取对应网页的访问量
  22. Map<String, Long> map = HBaseUtils.getInstance().query( "web_course_clickcount", "20200311");
  23. for(Map.Entry<String,Long> entry:map.entrySet()){
  24. CourseClickCount model = new CourseClickCount();
  25. model.setName(entry.getKey());
  26. model.setValue(entry.getValue());
  27. list.add(model);
  28. }
  29. return list;
  30. }
  31. public static void main(String[] args) throws Exception{
  32. CourseClickDAO dao = new CourseClickDAO();
  33. List<CourseClickCount> list = dao.query( "20200311");
  34. for(CourseClickCount model:list){
  35. System.out.println(model.getName() + ":" + model.getValue());
  36. }
  37. }
  38. }

使用JSON需要引入:


  
  1. <dependency>
  2. <groupId>net.sf.json-lib </groupId>
  3. <artifactId>json-lib </artifactId>
  4. <version>2.4 </version>
  5. <classifier>jdk15 </classifier>
  6. </dependency>

Web层:


  
  1. package com.taipark.spark.web.spark;
  2. import com.taipark.spark.web.dao.CourseClickDAO;
  3. import com.taipark.spark.web.domain.CourseClickCount;
  4. import net.sf.json.JSONArray;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RequestMethod;
  8. import org.springframework.web.bind.annotation.ResponseBody;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import org.springframework.web.servlet.ModelAndView;
  11. import java.util.HashMap;
  12. import java.util.List;
  13. import java.util.Map;
  14. /**
  15. * web层
  16. */
  17. @RestController
  18. public class WebStatApp {
  19. private static Map<String,String> courses = new HashMap<>();
  20. static {
  21. courses.put( "112", "某些外国人对中国有多不了解?");
  22. courses.put( "128", "你认为有哪些失败的建筑?");
  23. courses.put( "145", "为什么人类想象不出四维空间?");
  24. courses.put( "146", "有什么一眼看上去很舒服的头像?");
  25. courses.put( "131", "男朋友心情不好时女朋友该怎么办?");
  26. courses.put( "130", "小白如何从零开始运营一个微信公众号?");
  27. courses.put( "821", "为什么有人不喜欢极简主义?");
  28. courses.put( "825", "有哪些书看完后会让人很后悔没有早看到?");
  29. }
  30. // @Autowired
  31. // CourseClickDAO courseClickDAO;
  32. // @RequestMapping(value = "/course_clickcount_dynamic",method = RequestMethod.GET)
  33. // public ModelAndView courseClickCount() throws Exception{
  34. // ModelAndView view = new ModelAndView("index");
  35. // List<CourseClickCount> list = courseClickDAO.query("20200311");
  36. //
  37. // for(CourseClickCount model:list){
  38. // model.setName(courses.get(model.getName().substring(9)));
  39. // }
  40. // JSONArray json = JSONArray.fromObject(list);
  41. //
  42. // view.addObject("data_json",json);
  43. //
  44. // return view;
  45. // }
  46. @Autowired
  47. CourseClickDAO courseClickDAO;
  48. @RequestMapping(value = "/course_clickcount_dynamic",method = RequestMethod.POST)
  49. @ResponseBody
  50. public List<CourseClickCount> courseClickCount() throws Exception{
  51. ModelAndView view = new ModelAndView( "index");
  52. List<CourseClickCount> list = courseClickDAO.query( "20200311");
  53. for(CourseClickCount model:list){
  54. model.setName(courses.get(model.getName().substring( 9)));
  55. }
  56. return list;
  57. }
  58. @RequestMapping(value = "/echarts",method = RequestMethod.GET)
  59. public ModelAndView echarts(){
  60. return new ModelAndView( "echarts");
  61. }
  62. }

下载JQuery,并放到static/js下,新建echarts.html:


  
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>web_stat </title>
  6. <!-- 引入 ECharts 文件 -->
  7. <script src="js/echarts.min.js"> </script>
  8. <script src="js/jquery-3.4.1.min.js"> </script>
  9. </head>
  10. <body>
  11. <!-- 为 ECharts 准备一个具备大小(宽高)的 DOM -->
  12. <div id="main" style="width: 960px;height:540px;position: absolute;top:50%;left:50%;margin-top: -200px;margin-left: -300px;"> </div>
  13. <script type="text/javascript">
  14. // 基于准备好的dom,初始化echarts实例
  15. var myChart = echarts.init( document.getElementById( 'main'));
  16. option = {
  17. title: {
  18. text: '某站点实时流处理访问量统计',
  19. subtext: '网页访问次数',
  20. left: 'center'
  21. },
  22. tooltip: {
  23. trigger: 'item',
  24. formatter: '{a} <br/>{b} : {c} ({d}%)'
  25. },
  26. legend: {
  27. orient: 'vertical',
  28. left: 'left'
  29. },
  30. series: [
  31. {
  32. name: '访问次数',
  33. type: 'pie',
  34. radius: '55%',
  35. center: [ '50%', '60%'],
  36. data: ( function () {
  37. var datas = [];
  38. $.ajax({
  39. type: "POST",
  40. url: "/taipark/course_clickcount_dynamic",
  41. dataType: "json",
  42. async: false,
  43. success: function (result) {
  44. for( var i= 0;i<result.length;i++){
  45. datas.push({ "value":result[i].value,
  46. "name":result[i].name})
  47. }
  48. }
  49. })
  50. return datas;
  51. })(),
  52. emphasis: {
  53. itemStyle: {
  54. shadowBlur: 10,
  55. shadowOffsetX: 0,
  56. shadowColor: 'rgba(0, 0, 0, 0.5)'
  57. }
  58. }
  59. }
  60. ]
  61. };
  62. // 使用刚指定的配置项和数据显示图表。
  63. myChart.setOption(option);
  64. </script>
  65. </body>
  66. </html>

测试一下:

6.3 Spring的服务器部署

Maven打包并上传服务器

java -jar web-0.0.1.jar

完成~


转载:https://blog.csdn.net/qq_36329973/article/details/104738993
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场