小言_互联网的博客

Spark Streaming 快速入门系列(5) | 还不会DStream转换,一文带你深入了解

528人阅读  评论(0)

  大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/

  本片博文为大家带来的是DStream 转换。



关于转换这方面的一些具体问题,如果想要了解可以点击下列网址进行查看
http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#transformations-on-dstreams


  上图为官网的解释,我们可以翻译为:
  与RDD相似,转换允许修改来自输入DStream的数据。DStream支持普通Spark RDD上可用的许多转换。

  除此之外,DStream分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()transform()以及各种Window相关的原语。

  • 一些常见的方法


  在DStream转换中,大体可分为无状态转换操作和有状态转换操作两种! 下面就围绕这两个方面进行详细讲解。

一. 无状态转换操作

  无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。

  需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。例如,reduceByKey()会化简每个时间区间中的数据,但不会化简不同区间之间的数据。

  举个例子,在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。

  无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内。例如,键值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup()join()leftOuterJoin() 等。我们可以在DStream上使用这些操作,这样就对每个批次分别执行了对应的RDD操作。

  我们还可以像在常规的 Spark 中一样使用 DStreamunion() 操作将它和另一个DStream 的内容合并起来,也可以使用StreamingContext.union()来合并多个流。

transform操作

  transform 原语允许 DStream上执行任意的RDD-to-RDD函数。

  可以用来执行一些 RDD 操作, 即使这些操作并没有在 SparkStreaming 中暴露出来.

  该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

  • 1. 样例源码
package com.buwenbuhuo.spark.streaming.day02
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{
   Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object TransformDemo {
   
  def main(args: Array[String]): Unit = {
   
    val conf = new SparkConf().setAppName("TransformDemo").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop002", 9999)

    val resultDStream = dstream.transform(rdd => {
   
      rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)
    })
    resultDStream.print


    ssc.start()
    ssc.awaitTermination()
  }

}

  • 2. 运行结果

二. 有状态转换操作

  此部分主要介绍两个有状态的操作

2.1 updateStateByKey


  上图为官方解释,下面为翻译:

  updateStateByKey操作允许在使用新信息不断更新状态的同时能够保留他的状态.

需要做两件事情:

  1. 定义状态. 状态可以是任意数据类型
  2. 定义状态更新函数. 指定一个函数, 这个函数负责使用以前的状态和新值来更新状态.

  在每个阶段, Spark 都会在所有已经存在的 key 上使用状态更新函数, 而不管是否有新的数据在.

def updateStateByKey[S: ClassTag](
                 updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

  • 1. 样例源码
package com.buwenbuhuo.spark.streaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{
   Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object UpstateByKeyDemo {
   
  def main(args: Array[String]): Unit = {
   
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpstateByKeyDemo")
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
    ssc.checkpoint("ck1")

    ssc
        .socketTextStream("hadoop002",9999)
        .flatMap(_.split("\\W+"))
        .map((_,1))
        .updateStateByKey[Int]((seq:Seq[Int],opt:Option[Int]) =>{
   
                Some(seq.sum + opt.getOrElse(0))
    })
        .print(1000)


    ssc.start()
    ssc.awaitTermination()
  }
}

  • 2. 运行结果

  • 3. 源码流解析

2.2 window 操作(窗口操作)

  Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据.

  默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上.

  一个窗口可以包含多个时间段. 基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。


  上图所示, 窗口在 DStream 上每滑动一次, 落在窗口内的那些 RDD会结合在一起, 然后在上面操作产生新的 RDD, 组成了 window DStream.

在上面图的情况下, 操作会至少应用在 3 个数据单元上, 每次滑动 2 个时间单位. 所以, 窗口操作需要 2 个参数:

  1. 窗口长度 – 窗口的持久时间(执行一次持续多少个时间单位)(图中是 3)
  2. 滑动步长 – 窗口操作被执行的间隔(每多少个时间单位执行一次).(图中是 2 )

  注意: 这两个参数必须是源 DStream 的 interval 的倍数.

  • 一些常见的窗口操作
    所有这些操作均采用上述两个参数-windowLength和slideInterval。

  • 1. 测试程序源码
package com.buwenbuhuo.spark.streaming.day02.window
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{
   Seconds, StreamingContext}


/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object Window1 {
   
  def main(args: Array[String]): Unit = {
   
    val conf = new SparkConf().setAppName("Window1").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))

    ssc
      .socketTextStream("hadoop002",9999)
      .flatMap(_.split("\\W+"))
      .map((_,1))
        .reduceByKeyAndWindow(_ + _,Seconds(6))
        .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  • 2. 运行结果

2.3 window的优化操作

  • 1. reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration)
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
/*
参数1: reduce 计算规则
参数2: 窗口长度
参数3: 窗口滑动步长. 每隔这么长时间计算一次.
 */
val count: DStream[(String, Int)] =
wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(15), Seconds(10))

  • 2. reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration)
    比没有invReduceFunc高效. 会利用旧值来进行计算.

invReduceFunc: (V, V) => V 窗口移动了, 上一个窗口和新的窗口会有重叠部分, 重叠部分的值可以不用重复计算了. 第一个参数就是新的值, 第二个参数是旧的值.

ssc.sparkContext.setCheckpointDir("hdfs://hadoop002:9000/checkpoint")
val count: DStream[(String, Int)] =
    wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,(x: Int, y: Int) => x - y,Seconds(15), Seconds(10))

  • 3. window(windowLength, slideInterval)
    基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream
package com.buwenbuhuo.spark.streaming.day02.window

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{
   Seconds, StreamingContext}


/**
 *
 * @author 不温卜火
 * @create 2020-08-12 22:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object Window2 {
   
  def main(args: Array[String]): Unit = {
   
    val conf = new SparkConf().setAppName("Window2").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))
    
    ssc.checkpoint("ck3")
    ssc
      .socketTextStream("hadoop002",9999)
      .window(Seconds(9),Seconds(6))
      .flatMap(_.split("\\W+"))
      .map((_,1))
        .reduceByKeyAndWindow(_ + _,Seconds(6))
        .print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 4. countByWindow(windowLength, slideInterval)
    返回一个滑动窗口计数流中的元素的个数。

  • 5. countByValueAndWindow(windowLength, slideInterval, [numTasks])
    对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的的对象的v是其在滑动窗口中频率。如上,可配置reduce任务数量。

  本次的分享就到这里了,


  好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
  如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
  码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!



转载:https://blog.csdn.net/qq_16146103/article/details/107961019
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场