小言_互联网的博客

[scala-spark]11. RDD控制操作

315人阅读  评论(0)

Spark可以将RDD持久化到内存或者磁盘,持久化到内存可以极大的提高迭代计算以及计算模型之间的数据共享,一般情况下,执行节点60%内存用于缓存数据,剩下40%用于运行任务。Spark使用persist、cache进行操作持久化,其中cache是persist的特例。

  • cache():RDD[T]
  • persist():RDD[T]
  • persist(level:StorageLevel):RDD[T]

1. 什么情况下需要对数据进行持久化

  1. 某步骤计算特别耗时,重新计算的代价较高,所以进行持久化
  2. 计算链条特别长的情况下,重新计算的代价也较高
  3. checkpoint所在的RDD也一定要持久化数据,checkpoint是lazy的,框架本身会对checkpoint的RDD触发新的job,不进行persist的话,进行checkpoint的时候数据就会重新计算一遍,所以checkpoint之前一定要进行 persist,因为在checkpoint前有了persist的前提下,计算过一遍之后,再进行计算的时候计算速度非常快
  4. shuffle操作之后,因为shuffle要进行网络传输,网络传输风险大,数据极易丢失,所以shuffle之前进行persist避免数据丢失
  5. shuffle操作之前,框架默认帮助我们把数据持久化到本地磁盘,该步骤由框架自动完成

2. 函数接口的使用

  • persist()

//persist():RDD[]
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

默认使用MEMORY_ONLY这个缓存级别

persist(level:StorageLevel):RDD[T]

根据不用的缓存级别对RDD做不同的缓存操作

  • cache()
def cache(): this.type = persist()

cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY将RDD缓存在内存中

  • unpersist()

清除缓存操作

def unpersist(blocking: Boolean = true): this.type = {
    logInfo("Removing RDD " + id + " from persistence list")
    sc.unpersistRDD(id, blocking)
    storageLevel = StorageLevel.NONE
    this
}

 


转载:https://blog.csdn.net/shenziheng1/article/details/101180731
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场