Spark RDD算子
RDD 方法也叫做RDD算子,主要分为两类,第一类是用来做转换的,例如flatMap()
,Map()
方法,第二类是行动的,例如:collenct()
方法,只有触发了作业才会被执行。
一、RDD 转换算子
RDD 根据数据处理方式的不同将算子整体上分为Value
类型,双Value
类型和Key-value
类型。
1、Value 类型
(1) map
将处理的数据逐条
进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
package com.atguigu.bigdata.spark.core.wc.operator
import org.apache.spark.{
SparkConf, SparkContext}
//RDD 算子转换类型
class Spark01_RDD_Transform {
}
object Spark01_RDD_Transform{
def main(args: Array[String]): Unit = {
//配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_zhuanhuan")
val context = new SparkContext(conf)
//TODO 算子 => map
val rdd = context.makeRDD(List(1, 2, 3, 4)) //基于内存创建一个RDD
// def hanshu(num:Int):Int = {
// num * 2
// }
//
// val value1 = rdd.map(hanshu)
// value1.collect().foreach(println)
val value = rdd.map(a => a * 2)
println(value.collect().foreach(println))
context.stop()
}
}
map 算子的小测试:从服务器日志数据 apache.log中获取用户请求URL资源的路径
思路:文件最右边的那个是文件的路径。可以使用map方法,里面split(" ")
方法用空格分隔开,然后再使用takeRight()
方法,取最右边的第一个元素,那就是文件的地址了
package com.atguigu.bigdata.spark.core.wc.operator
import org.apache.spark.{
SparkConf, SparkContext}
//map 算子的小测试:从服务器日志数据 apache.log中获取用户请求URL资源的路径
class Spark02_RDD_test {
}
object Spark02_RDD_test{
def main(args: Array[String]): Unit = {
//配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_zhuanhuan")
val context = new SparkContext(conf)
//TODO 算子 => map
val rdd = context.textFile("datas/apache.log")
//长的字符串
//短的字符串
val value = rdd.map(
a => a.split(" ").takeRight(1)//将文件按照空格隔开,然后拿最右边的那一个数据
)
value.collect().foreach(println)
context.stop()
}
}
map 分区数据执行顺序测试:
1、rdd的计算一个分区内的那么数据是一个一个执行逻辑
只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
一个分区内的数据的执行是有序的,
2、不同分区数据计算是无序的
package com.atguigu.bigdata.spark.core.wc.operator
import org.apache.spark.{
SparkConf, SparkContext}
//测试分区的执行的顺序
class Spark02_RDD_Transform_Par {
}
object Spark02_RDD_Transform_Par{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")
val context = new SparkContext(conf)
//1、rdd的计算一个分区内的那么数据是一个一个执行逻辑
//只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
//一个分区内的数据的执行是有序的,
//2、不同分区数据计算是无序的
val rdd = context.makeRDD(List(1,2,3,4),2)
val rddMap = rdd.map(num => {
println("<<<"+num)}) //第一个map转换
val rddMap1 = rddMap.map(num=>{
println("###"+num)}) //第二个map转换
//发现并行计算是没有顺序的
rddMap.collect().foreach(println) //第一个rddMap执行
rddMap1.collect().foreach(println) //第二个rddMap执行,然后查看他们输出的顺序
context.stop()
}
}
(2) mapPartitions
1)函数说明
将待处理的数据以分区为单位
发送到计算节点进行处理,这里的处理是值可以进行任意的处理,哪怕是数据过滤。例如这里过滤掉等于2的数据。
val dataRDD1 = dataRDD.mapPartitions(
datas => {
datas.filter(_ == 2)
}
)
说明:
map
是一个一个执行的,类似于之前的字节流,所以效率肯定不高,所以需要一个像之前优化字节流的缓冲区那样的方法,所以有了mapParitions
方法,mapParitions
方法是将一个分区内的数据全部拿到之后,然后再进行map操作,那效率肯定就高得多。
注意:
mapPartitions:可以以分区为单位进行数据转换操作,但是会将整个分区的数据加载到内存中进行引用,如果处理完的数据是不会被释放掉,存在对象的引用,所以在内存比较小的情况下,数据量较大的情况下,容易出现内存溢出。
总结:两个方法的应用场景不同,如果内存足够那么mapPartitions
方法肯定是效率更高的,但是mapPartitions
方法存在对象引用,操作完之后内存不会被释放。要是内存小,数据量大的情况下那么最好使用map
方法,因为是一条一条操作的,执行完之后内存就会被释放,没有对象引用,虽然效率会低一点,但是不会出错。
package com.atguigu.bigdata.spark.core.wc.operator
import org.apache.spark.{
SparkConf, SparkContext}
//map 是一个一个执行的,类似于之前的字节流,所以效率肯定不高
//所以需要一个像之前优化字节流的缓冲区那样的方法
//所以有了mapParitions 方法
class Spark02_RDD_Transform {
}
object Spark02_RDD_Transform{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")
val context = new SparkContext(conf)
val rdd = context.makeRDD(List(1, 2, 3, 4), 2) //创建一个RDD
//TODO 算子 - mapPartitions
//mapPartitions:可以以分区为单位进行数据转换操作
//但是会将整个分区的数据加载到内存中进行引用
//如果处理完的数据是不会被释放掉,存在对象的引用
//所以在内存比较小的情况下,数据量较大的情况下,容易出现内存溢出。
//这个方法之所以高效,他是把一个分区内的数据全部拿到之后才开始做操作
//而不是一个一个的做操作
val mpRDD = rdd.mapPartitions(a => {
//这这个方法执行底层是迭代器
println(">>>>>>>>>>")
a.map(_ * 2) //相当于先把一个分区内的数据聚合了,然后再进行map操作,这个效率就要高得多了
})
mpRDD.collect()foreach(println)
context.stop()
}
}
2)小案例获取每个分区的最大值
首先创建RDD的时候,就设置好分区数。
思路:因为mapPartitions
方法是将待处理的数据以分区为单位
发送到计算节点进行处理,所以我们可以直接用它直接按照每一个分区进行操作,然后直接max方法获取最大值。但是这里的难点在于,mapPartitions
方法返回的是一个迭代器,而max
方法返回的是一个Int类型的值,所以我们需要用List
或者其他类型的集合都可以,给它包裹起来,然后用toIterator
方法进行转换,例如List(a.max).toIterator
。最后就可以得到每一个分区的最大值了,第一个分区1,2 第二个分区的数据3,4 所以最后输出的是2,4。
package com.atguigu.bigdata.spark.core.wc.operator
import org.apache.spark.{
SparkConf, SparkContext}
//案例:获取每个分区的最大值
class Spark02_RDD_Transform_Par2 {
}
object Spark02_RDD_Transform_Par2{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")
val context = new SparkContext(conf)
val rdd = context.makeRDD(List(1, 2, 3, 4), 2) //创建一个RDD
//TODO 算子 - mapPartitions
val mpRDD = rdd.mapPartitions(a => {
//这这个方法执行底层是迭代器
println(">>>>>>>>>>")
List(a.max).toIterator //因为mapPartitions方法返回的是一个迭代器,a.max得到的是一个Int的数值
}) //所以我们的用列表,或者其他的集合都可以把他包起来,然后toIterator将它转换为迭代器就可以了
mpRDD.collect().foreach(println) //得到的结果应该是2和4,第一个分区1,2 第二个分区2,4
context.stop()
}
}
(3) map 和 mapParitions 的区别
数据处理角度:
Map 算子是分区内一个数据一个数的执行,类似于串行操作。而mapParitions算子是已分区为单位进行批处理操作。
功能的角度:
Map 算子主要目的是将数据源中的数据进行转换和改变。但是不会减少或增多数据。mapParitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据。
性能的角度:
Map 算子因为类似于串行操作,所以性能比较低,mapParitions 算子类似于批处理,所以性能较高。但是mapParitions 算子会长时间占用内容,那么这样会导致内存可能不够用,出现内存溢出的错误,所以在内存有限的情况下
,不推荐使用,推荐使用map操作。
(4) mapParitionsWithIndex
函数说明:
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
1)小案例只获取第二个分区的最大值
就是跟mapParitions
方法一样的,只是多了一个分区编号,可以指定操作哪一个分区。在某些时候非常有用,比如有两个分区,我只要第二个分区的最大值,第一个分区的数据不要。
思路:
里面第一个参数是分区的索引,第二个参数是迭代器也就是分区的所有数据。我们可以对分区进行判断,如果等于1
说明就是第二个分区,我们直接返回那个迭代器,然后求的是第二个分区的最大值,我们再像刚刚一样用集合包起来,然后使用toIterator
方法进行转换。然后如果不为1的话那么返回一个空的迭代器,Nil.iterator
Nil 方法是空集合,空集合.迭代器,就是空迭代器。
package com.atguigu.bigdata.spark.core.wc.operator
import org.apache.spark.{
SparkConf, SparkContext}
//mapParitionsWithIndex 方法 比mapParitions多了一个分区编号
class Spark03_RDD_mapParitionsWithIndex {
}
object Spark03_RDD_mapParitionsWithIndex{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")
val context = new SparkContext(conf)
val rdd = context.makeRDD(List(1, 2, 3, 4), 2) //创建一个RDD
//TODO 算子 - mapPartitionsWithIndex
//[1,2][3,4]
val mpRDD = rdd.mapPartitionsWithIndex(
(index,iter) => {
//第一个参数是索引的编号,第二个参数是全部的数据,就是迭代器
if (index == 1){
List(iter.max).toIterator //因为我们只要第二个分区,第一个分区索引为0,第二个分区索引为1,如果1就直接返回迭代器
}else{
Nil.iterator //如果不是1,那么我们返回一个空的迭代器,Nil 空集合
}
}
)
mpRDD.collect().foreach(println)
context.stop()
}
}
2)小案例获取每一个数据的分区来源
分为了4个分区
思路:
使用mapPartitionsWithIndex
方法,第一个是索引第二个是迭代器,分区中的每一个数据,然后对迭代器进行map操作,映射,第一个参数是分区的索引,第二个参数是分区中的每个数据。就取出来了。
package com.atguigu.bigdata.spark.core.wc.operator
import org.apache.spark.{
SparkConf, SparkContext}
获取每一个数据来自于哪一个分区
class Spark03_RDD_mapParitionsWithIndex2 {
}
object Spark03_RDD_mapParitionsWithIndex2{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")
val context = new SparkContext(conf)
val rdd = context.makeRDD(List(1, 2, 3, 4), 4) //创建一个RDD
//TODO 算子 - mapPartitionsWithIndex
//[1,2][3,4]
val mpRDD = rdd.mapPartitionsWithIndex(
(index,iter) => {
iter.map(
a => {
(index,a) //第一个是分区索引,第二个是每一个数据
}
)
}
)
mpRDD.collect().foreach(println)
context.stop()
}
}
(5) flatMap
1) 函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射。
2) 小案例将List(List(1,2),3,List(4,5)) 进行扁平化操作
思路:
要是列表里面的数据类型都是一样的话,比如 List(List(1,2),List(4,5)),就是两个列表那么直接rdd.flatMap(a => a)
直接输出这个列表扁平化就完成了,非常简单,但是要是列表中不只是只有列表,比如List(List(1,2),3,List(4,5))
里面有个3,他不是集合,数据类型不一样,这时候就要进行模式匹配了。首先匹配,如果是列表那么就直接输出列表,如果不是列表那么就List()
把它包裹起来,这不就变成列表了嘛,就可以对三个列表进行扁平化了。
package com.atguigu.bigdata.spark.core.wc.operator
import org.apache.spark.rdd.RDD
import org.apache.spark.{
SparkConf, SparkContext}
class tset2 {
}
object tset2{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val context = new SparkContext(conf)
//两个列表进行扁平化合并
val rdd:RDD[List[Int]] = context.makeRDD(List(List(1,2),List(4,5)))
val rddMap = rdd.flatMap(a => a)
rddMap.collect().foreach(println)
//单词进行扁平化,只有字符串类型的才有split(" ")方法
val rdd2 = context.makeRDD(List("Hello world", "Helllo Spark", "Hello Scala"))
val rddMap2 = rdd2.flatMap(_.split(" "))
rddMap2.collect().foreach(println)
println("===============")
val rdd3: RDD[Any] = context.makeRDD(List(List(1, 2), 3, List(4, 5)))
val rddFlatmap = rdd3.flatMap {
case list: List[_] => list //模式匹配。如果是集合类型的那么就返回这个集合
case list2 => List(list2) //如果不是集合的那么用集合把它包起来那不就是集合了嘛
}
rddFlatmap.collect().foreach(println)
context.stop()
}
}
转载:https://blog.csdn.net/m0_72168501/article/details/128742657