(五)SparkStreaming_DStream操作实战
1、添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0-cdh6.0.1</version>
</dependency>
2、利用sparkstreaming接收socket数据实现单词统计
[root@node01 ~]# yum -y install nc
[root@node01 ~]# nc -lk 9999
//复制一个ssh通道 查看运行的端口
[root@node01 ~]# netstat -nlp
tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN 28112/nc
package cn.itcast.socket
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
//todo:利用saprkStreaming接受socket数据实现单词统计
object SparkStreamingSocket {
def main(args: Array[String]): Unit = {
//1、创建SparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingSocket")
.setMaster("local[2]")
//2、创建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
//3、创建StreamingContext 需要一个SparkContext,
// 还有一个批处理时间间隔,每隔5秒处理上一个5秒的数据
val ssc = new StreamingContext(sc,Seconds(5))
//4.接收socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999) //提前在集群上开启
//5、切分每一行数据
val words: DStream[String] = socketTextStream.flatMap(_.split(" "))
//6、每个单词计为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//7、相同单词出现1累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//8、打印输出
result.print()
//9、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
/*
运行的时候 在9999的node01 输入
check hello hadoop hadoop
hadoop hello
然后这里的终端就是接收到并且reduce(每5秒)
(check,1)
(hello,1)
(hadoop,2)
-------------------------------------------
Time: 1569500000000 ms
-------------------------------------------
(hadoop,1)
(hello,1)
*/
3、利用sparkstreaming接收socket数据实现所有批次单词统计的结果累加
package cn.itcast.socket
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
/**
* [root@node01 ~]# nc -lk 9999
*/
object SparkStreamingSocketTotal {
/**
* 这个函数大改啊
* @param concurrentValues 当前批次相同的单词出现 的所有1 (hadoop,1) (hadoop,1) (hadoop,1)----->List(1,1,1)
* @param historyValues 在之前所有批次相同单词出现的总次数 (Option类型可以表示有值(some),或者没有值(None))
* @return
*/
def updateFunc(concurrentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = {
val newValues: Int = concurrentValues.sum+historyValues.getOrElse(0)
Some(newValues)
}
def main(args: Array[String]): Unit = {
//1、创建SparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingSocketTotal")
.setMaster("local[2]")
//2、创建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
//3、创建StreamingContext 需要一个SparkContext,
// 还有一个批处理时间间隔,每隔5秒处理上一个5秒的数据
val ssc = new StreamingContext(sc, Seconds(5))
//[修改2] 设置checkpoint目录,用于保存每一个单词之前的所有批次中出现的总次数
ssc.checkpoint("./day01_wordcount/data/socket")
//4.接收socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999) //提前在集群上开启
//5、切分每一行数据
val words: DStream[String] = socketTextStream.flatMap(_.split(" "))
//6、每个单词计为1
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
//7、[修改1]相同单词出现1累加
// val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
//8、打印输出
result.print()
//9、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
/*
在node01 疯狂输入
-------------------------------------------
Time: 156900030000 ms
-------------------------------------------
(check,3)
(,5)
(hello,3)
(hadoop,3)
(,1)
(where,3)
*/
4、利用sparkStreaming接收socket数据实现单词统计—使用开窗函数
package cn.itcast.socket
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
/**
* 窗口长度 > 滑动窗口的时间间隔 ----数据会被重复处理
* < ----数据会丢失
* = -----数据不会丢失或重复处理
*/
object SparkStreamingSocketWindow {
def main(args: Array[String]): Unit = {
//1、创建SparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingSocketWindow")
.setMaster("local[2]")
//2、创建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
//3、创建StreamingContext 需要一个SparkContext,
// 还有一个批处理时间间隔,每隔5秒处理上一个5秒的数据
val ssc = new StreamingContext(sc,Seconds(5))
//4.接收socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999) //提前在集群上开启
//5、切分每一行数据
val words: DStream[String] = socketTextStream.flatMap(_.split(" "))
//6、每个单词计为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//7、[修改]相同单词出现1累加
// val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
/**
* reduceFunc: (v,v)=>v 就是一个函数
* windowDuration:Duration 窗口的长度
* slideDuration:Duration 滑动窗口的时间间隔,它表示每隔多久计算一次
*/
val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(10), Seconds(10))
//8、打印输出
result.print()
//9、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
5、利用sparkStreaming接收socket数据实现一定时间内热门词汇
-
1、代码开发
(跟取topN逻辑有点类似)
-
transform
- 实现把一个DStream转换成一个新的DStream,内部需要一个函数,函数的输入参数就是前面DStream中的rdd,函数的返回值是一个新的rdd。
-
foreachRDD
- 它是一个outputOperation,会触发任务的运行,需要一个函数,函数的输入参数还是前面的DStream中的rdd,最后的返回值Unit表示没有
package cn.itcast.socket
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
//todo:利用sparkStreaming接收socket数据实现一定时间内热门词汇
object SparkStreamingSocketWindowHotWord {
def main(args: Array[String]): Unit = {
//1、创建SparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingSocketWindow")
.setMaster("local[2]")
//2、创建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
//3、创建StreamingContext 需要一个SparkContext,
// 还有一个批处理时间间隔,每隔5秒处理上一个5秒的数据
val ssc = new StreamingContext(sc,Seconds(5))
//4.接收socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999) //提前在集群上开启
//5、切分每一行数据
val words: DStream[String] = socketTextStream.flatMap(_.split(" "))
//6、每个单词计为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//7、相同单词出现1累加
// val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
/**
* reduceFunc: (v,v)=>v 就是一个函数
* windowDuration:Duration 窗口的长度
* slideDuration:Duration 滑动窗口的时间间隔,它表示每隔多久计算一次
*/
val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(10), Seconds(10))
//[增加]按照单词出现的次序降序
val finalResult: DStream[(String, Int)] = result.transform(rdd => {
//可以使用rdd中排序的方法操作
val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
//取出出现次数最多的前3位
val top3: Array[(String, Int)] = sortRDD.take(3)
//打印
println("---------------------top3 start----------------------")
top3.foreach(println)
println("--------------------top3 end--------------------------")
sortRDD
})
//8、[修改]打印输出
finalResult.print()
//9、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
/*
输入
[root@node01 sbin]# nc -lk 9999
hadoop spark hello hive hadoop spark hive hbase
输出
---------------------top3 start----------------------
(hive,2)
(spark,2)
(hadoop,2)
--------------------top3 end--------------------------
-------------------------------------------
Time: 1569600000000 ms
-------------------------------------------
(hive,2)
(spark,2)
(hadoop,2)
(hello,1)
(hbase,1)
*/
6、SparkStreaming整合flume
少包 未
7、SparkStreaming整合kafka
7.1 添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
7.2 启动zookeeper集群
[root@node01] zkServer.sh start
7.3 启动kafka集群
//每台node
[root@node01 servers]# cd /export/servers/kafka_2.11-1.0.0/
[root@node01 kafka_2.11-1.0.0]# nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
7.4 创建topic
[root@node01 kafka_2.11-1.0.0]# kafka-topics.sh --create --topic heima --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
//查看一下
[root@node01 kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 __consumer_offsets
__consumer_offsets
heima
kafkatopic
7.5-1 开启接收器----KafkaUtils.createStream
SparkStreamingKafkaReceiver
1)程序逻辑处理
它是基于receiver接收器去拉取数据,
默认数据丢失,
开启WAL日志,把接收到的数据同步写入到hdfs上,保证数据源的安全性,
后期某些rdd的部分分区数据丢失后,可以通过血统+原始数据重新计算恢复得到。
可以保证数据不丢失,
但是它会出现数据重复处理。
2)代码开发
package cn.itcast.kafka
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable
//todo:sparkStreaming整合kafka-------kafka高级api(消息的偏移量保存zk,基于receiver接收器接受数据)
object SparkStreamingKafkaReceiver {
def main(args: Array[String]): Unit = {
//1、创建SparkConf
val sparkConf = new SparkConf()
sparkConf
.setAppName("SparkStreamingKafkaReceiver")
.setMaster("local[4]") //因为有3个receiver 至少大于3
//【优化2-1】默认会丢失 开启WAL日志,把接收到的数据写入到HDFS上,保证数据源的安全性(血统+数据源)
.set("spark.streaming.receiver.writeAheading.enable","true")
//2、创建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
//3、创建StreamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//【优化2-2】设置一个checkpoint目录,用于存储接收到的数据,保证数据的安全
ssc.checkpoint("./day01_wordcount/data/spark-recever")
//4、指定zk地址
val zkQuorum = "node01:2181,node02:2181,node03:2181"
//5、消费组id
val groupId = "spark-receiver"
//6、指定topic信息,key:表示topic名称,value:表示一个recevier接收器需要使用1个线程去拉取数据
val topics: Map[String, Int] = Map("heima" -> 1)
//7、通过recevier接收器接受kafka的topic数据
//(String,String)------>第一个String表示消息的key,第二个String就是消息内容
//[单个接收器]默认使用一个receiver接收器去接收数据,接收数据比较慢,后期可以使用多个receiver接收器接收数据
// val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
//【优化1】 [多个接收器]
val totalReceiverSeq: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
kafkaStream
})
//通过union方法把所有recevier接收器得到的每一个DStream进行合并成一个新的DStream
val totalDStream: DStream[(String, String)] = ssc.union(totalReceiverSeq)
//8、获取topic的数据
val data: DStream[String] = totalDStream.map(x=>x._2) //kafkaStream(单个时)
//9、切分每一行获取所有单词
val words: DStream[String] = data.flatMap(_.split(" "))
//10、每个单词记为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//11、相同的单词出现的1累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//12、打印
result.print()
//13、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
/*
[root@node01 kafka_2.11-1.0.0]# kafka-topics.sh --create --topic heima --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
[root@node01 kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 __consumer_offsets
__consumer_offsets
heima
kafkatopic
用MyKafkaProducer.java生产一些数据
-------------------------------------------
Time: 1569759125000 ms
-------------------------------------------
(spark,300)
(hadoop,300)
*/
7.5-2 开启接收器----KafkaUtils.createDirectStream
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
1)程序逻辑处理
这个时候并没有使用receeiver接收器去接收数据,
也没有吧接收到的数据同步写入到hdfs上,
相对于第一套api代码开发起来相对简洁。
想要保证数据不丢失 和 数据被处理且只被处理一次
- 数据处理
- 保存偏移量
如果可以确保这2个操作在同一个事务中。
2)代码开发
package cn.itcast.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
//todo:SparkStreaming整合kafka-------使用低级api(消息的偏移量不再由zk保存,客户端自己去维护)
object SparkStreamingDirectKafka {
def main(args: Array[String]): Unit = {
//1、创建SparkConf
val sparkConf = new SparkConf()
.setAppName("SparkStreamingDirectKafka")
.setMaster("local[4]")
//2、创建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
//3、创建StreamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//设置checkpoint目录------>保存消息消费的偏移量
ssc.checkpoint("./day01_wordcount/data/spark-direct")
//4、指定kafka集群参数
val kafkaParams: Map[String, String] = Map("bootstrap.servers"->"node01:9092,node02:9092,node03:9092","group.id"->"spark-direct")
//5、指定消费的topic名称
val topics: Set[String] = Set("heima")
val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//6、获取tipic内容
val data: DStream[String] = dstream.map(x=>x._2)
//7、切分每一行获取所有单词
val words: DStream[String] = data.flatMap(_.split(" "))
//8、每个单词记为1
val wordAndOne: DStream[(String, Int)] = words.map(x=>(x,1))
//9、相同单词出现累加1
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//10、打印
result.print()
//11、开启流失计算
ssc.start()
ssc.awaitTermination()
}
}
/*
-------------------------------------------
Time: 1569700070000 ms
-------------------------------------------
(spark,100)
(hadoop,100)
*/
7.6 kafka生产数据
package cn.itcast.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 不会写看这个网址
* file:///F:/AAAA_HM大数据/00-课件/13kafka消息队列/1、kafka第一天/1、kafka第一天/文档/kafkaAPI/kafka.apache.org/0110/javadoc/index.html
*/
public class MyKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//自定义分区要加一行配置
props.put("partitioner.class", "cn.itcast.kafka.MyKafkaProducer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
// producer.send(new ProducerRecord<String, String>("kafkatopic", Integer.toString(i), Integer.toString(i)));
//kafka第一种分区方式 直接指定分区(没测试)
// producer.send(new ProducerRecord<String, String>("kafkatopic",1,"0","hellohello"+i));
//第二种分区方式,没给定分区号,给定数据的key,通过key获取hashcode,将数据均匀发送到三个分区里
//实际工作要通过key取hashcode进行分区,那么一定要保证key的变化 否则会去往一个分区
// producer.send(new ProducerRecord<String, String>("kafkatopic",i+"",i+""));
//第三种,不给分区,不给key,那么轮训方式发送
// producer.send(new ProducerRecord<String, String>("kafkatopic",i+""));
//第四种:自定义分区
//定义自定义分区类+在这个类前面添加配置
producer.send(new ProducerRecord<String, String>("heima",Integer.toString(i),"hadoop spark"));
}
producer.close();
}
}
8、flume-kafka-sparkStreaming
一路坦荡 遇风扶摇直上
转载:https://blog.csdn.net/weixin_44345917/article/details/102248869
查看评论