在大数据计算框架中,Shuffle阶段的设计优劣是决定性能好坏的关键因素之一。 为了深入理解Shuffle阶段的各个细节, 并进一步在理解的基础上优化代码,减少不必要的Shuffle开销, 我将通过几篇博客深入分析Spark Shuffle阶段的源代码实现,详细解析Spark Shuffle阶段的实现细节,主要内容包括Shuffle机制框架详解和当前Spark 2.12 中已经支持的Shuffle阶段的2种设计与实现。本文主要介绍Shuffle机制框架详解。
1. Shuffle介绍
在学习Shufile的过程中, 通常都会引用HadoopMapReduce框架中的Shufile过程作为入门或比较。在MapReduce框架中,Shuffle描述着数据从map task输出到reduce task输入的这段过程。shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。因为在分布式情况下,reduce task需要跨节点去拉取其它节点上的map task结果。这一过程将会产生网络资源消耗和内存,磁盘IO的消耗。通常shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝处理。一般将在map端的Shuffle称之为Shuffle Write,在Reduce端的Shuffle称之为Shuffle Read。下面参考网络上描述该过程经典的框架图。
map端的Shuffle简述:
- input, 根据split输入数据,运行map任务;
- patition, 每个map task都有一个内存缓冲区,存储着map的输出结果;
- spill, 当缓冲区快满的时候需要将缓冲区的数据以临时文件的方式存放到磁盘;
- merge, 当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
reduce 端的Shuffle简述:
reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。
- Copy过程,拉取数据。
- Merge阶段,合并拉取来的小文件
- Reducer计算
- Output输出计算结果
2. Spark Shuffle框架的演变
2.1 Hash Shuffle
在Spark 1. 1之前,Spark中只实现了 种Shuffle方式,即基于Hash的Shuffle。在基于Hash的Shuffle的实现方式中,每个Mapper阶段的Task都会为每个Reduce阶段的Task生成一个文件,通常会产生大量的文件(即对应为MxR个中间文件,其中,M表示Mapper阶段的Task个数,R表示Reduce阶段的task
个数)伴随着大量的随机磁盘IO操作与内存开销。其示意图如下图所示。
图中有3个 Reducer,从Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 Task 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,所以总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件。
2.2 改进的Hash Shuffle
为了缓解上述问题,在Spark0. 8. 1版本中为基于Hash的Shuffle的实现引入了Shuffle Consolidate机制(即文件合并机制),即将Mapper端生成的中间文件进行合并的处理机制。通过设置配置属性"spark.shuffie. consolidateFiles"为true,来减少中间生成的文件数量。通 过文件合并,可以将中间文件的生成方式修改为每个执行单位(类似于Hadoop的Slot), 为每个Reduce阶段的Task生成一个文件。其中,执行单位对应为:每个Mapper阶段的Core数/每个Task分配的Core数(默认为1)。最终可以将文件个数从M x R修改为E x C/T x R,其中E表示Executor的数目,C表示可用的核数,T表示task所分配的核数。示意图如下图所示:
2.3 Sort Shuffle
基于Hash的Shuffle的实现方式中,生成的中间结果文件的个数都会依赖于Reduce阶段的Task个数,即Reduce端的并行度,因此文件数仍然不可控,无法真正解决问题。 因此,为了更好地解决问题,在Spark 1. 1版本引入了基于Sort的,Shuffle实现方式,并且在Spark1. 2 版本之后,默认的实现方式也从基于Hash的Shuffle修改为基于Sort的Shuffle实现方式。首先每个Mapper不会为每一个Reduce阶段的Task生成一个单独的文件;而是全部写到一个数据(Uata)文件中,同时生成个索引(Index)文件,Reduce阶段的各个Task可以通过该索引文件获取相关的数据。避免产生大量文件的直接收益就是降低随机磁盘1/0与内存的开销。最终生成的文件个数减少到2MB, 表示每个Mapper阶段的Task分别生成两个文件,分别为数据文件与索引文件。其实示意图如下图所示。(没找到合适的图,还懒得画,后续补上)
随着Tungsten计划的引入与优化,从Spark I. 4版本开始(Tungsten计划目前在Spark1.5与Spark1. 6两个版本中分别实现了第 与第二两个阶段),在Shuffle过程中也引入了基于Tungsten-Sort的Shuffle实现方式,通过Tungsten项目所做的优化,可以极大地提高Spark在数据处理上的性能。
在Spark2.12的实现中已经去除Hash Shuffle的实现,仅仅支持基于sort的Shuffle
3. Spark的Shuffle框架
从Spark1. 1版本开始,引进了可插拔式的Shuffle框架(通过将Shuffle相关的实现封装到一个统一的对外接口,提供一种具体实现可插拔的框架)。Spark框架中,通过ShuffleManager来管理各种不同实现机制的Shuffle过程,由ShuffleManager统一构建、管理具体实现子类来实现Shuffle框
架的可插拔的Shuffle机制。在详细描述Shuffle框架 实现细节之前, 先给出可插拔式 Shuffle的整体架构的类图,如下图所示。
在DAG的调度过程中,Stage阶段的划分是根据是否有Shuffle过程,也就是当存在ShuffleDependency的宽依赖时,需要进行Shuffle,这时会将作业(Job)划分成多个Stage。相应的,在源代码实现中,通过在划分Stage的关键点一构建ShuffleDependency时一一进行Shuffle注册,获取后续数据读写所需的ShuffleHandle。
Stage阶段 划分的详细过程 可以参考DAG Scheduler解析的博客, 最终每个作业(Job)提交后都会对应生成一个ResultStage与若干个ShuffleMapStage, 其中 ResultStage表示生成作业的最终结果所在的Stage。 ResultStage与ShuffleMapStage中的Task分别对应了ResultTask与ShuffleMapTask。一个作业,除了最终的 ResultStage 外,其他若干 ShuffleMapStage 中的各个ShuffieMapTask都需要将最终的数据根据相应的分区器(Part山oner)对数据进行分组(即将数据重组到新的各个分区中),然后持久化分组后的数据。相应的,每个RDD本身记录了 它的数据来源,在计算(compute)时会读取所需的数据,对于带有宽依赖的RDD,读取时会获取在ShuffleMapTask中持久化的数据。
从上图中可以看出,外部宽依赖相关的RDD与ShuffleManager之间的注册交互,通过该注册,每个RDD自带的宽依赖内部会维护Shuffle的唯一标识信息ShuffleId,以及与Shuffle过程具体读写相关的句柄ShuffleHandle,后续在ShuffleMapTask中启动任务(Task)时,可以通过该句柄获取相关的Shuffle写入器实例,实现具体的数据磁盘写操作。
而在宽依赖(ShuffleDependency) 的RDD 中,执行compute 时会去读取上一Stage 为其输出的Shuffle 数据,此时同样会通过该句柄获取相关的Shuffle 读取器实例,实现具体数据的读取操作。需要注意的是, 当前Shuffle 的读写过程中,与BlockManager 的交互,是通过MapOutputTracker 来跟踪Shuffle 过程中各个任务的输出数据的。在任务完成等场景中,会将对应的MapStatus 信息注册到MapOutputTracker 中, 而在compute 时的数据读取过程中,也会通过该跟踪器来获取上一Stage 的输出数据在BlockManager 中的位置,然后通过getReader得到的数据读取器,从这些位置中读取数据。
4. Spark Shuffle框架源代码解析
用户可以通过自定义ShuffleManager接口,并通过指定的配置属性进行设置,也可以过该配置属性指定 Spark 已经支持的 ShuffleManager 具体实现子类。在 SparkEnv 源代码中可以看到设置的配置属性, 以及当前在 Spark 的 ShuffleManager 可插拔框架中已经提供的 ShuffleManager 具体实现, 代码如下。
//配置文件
private[spark] val SHUFFLE_MANAGER =
ConfigBuilder("spark.shuffle.manager")
.version("1.1.0")
.stringConf
.createWithDefault("sort")
// Let the user specify short names for shuffle managers
//在2.12版本中仅支持两种基于sort的Shuffle
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
//获取用户指定的Shuffle名字
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
//获取相应的ShuffleManeger类
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
//创建ShuffleManager实例对象
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
从上面的代码中可以看出, ShuffieManager 是 Spark Shuffle 系统提供的一个可插拔式接口, 可以通过11 spark. shuffle. manager" 配置属性来设置自定义的 ShuffleManager。
在 Driver 和每个 Executor 的 SparkEnv 实例化过程中, 都会创建一个 ShuffleManager, 用于管理块数据, 提供集群块数据的读写, 包括数据的本地读写和读取远程结点的块数据。Shuffle 系统的框架可以以 ShuffleManager 作为入口进行解析。 在 ShuffleManager 中指定了整个 Shuffle 框架所使用的各个组件, 包括如何注册到 ShuffleManager 以获取一个用于数据读写的处理句柄 ShuffleHandle, 通过 ShuffleHandle 获取特定的数据读写接口: Shuffle Writer与ShuffleReader, 以及如何去获取块数据信息的解析接口 ShuffleBlockResolver。下面通过源代码分别对这几个比较重要的组件进行解析。
4.1. Shuffle Manager源码解析
ShuffleManager 是Spark Shuffle 系统提供的一个可插拔式接口, 已经提供的具体实现子类或自定义具体实现子类时, 都需要重写ShuilleManager 类的抽象接口。ShuffleManager 封装了各种Shuffle 机制的具体实现细节,其包含的接口与属性如下所示。
- registerShuffle: 每个RDD 在构建它的父依赖(这里特指ShuffieDependency) 时,都 会先注册到ShuffleManager , 获取ShuffleHandler, 用于后续数据块的读写等。
- get Writer: 可以通过ShuffleHandler 获取数据块写入器,写数据时通过Shuffle 的块解 析器shuffleBloc kResolver 获取写入位置(通常将写入位置抽象为Bucket, 位置的选择则由洗 牌的规则即Shuffle的分区器决定),然后将数据写入到相应位置(理论上,该位置可以位于任何能存储数据的地方,包括磁盘、内存或其他存储框架等,目前在可插拔框架的几种实现 中, Spark 与Hadoop 一样都采用了磁盘的方式进行存储\,主要目的是为了节约内存,同时提 高容错性)。
- getReader: 可以通过ShuffleHandler 获取数据块读取器, 然后通过Shuffle 的块解析 器shuffleBlockResolver 来获取指定数据块。
- unregisterShuffie : 与注册相对应,用于删除元数据等后续清理操作。
- shuffleBlockResolver: Shttlfle 的块解析器,通过该解析器,为数据块的读写提供支撑 层,便于抽象具体的实现细节。
下面分析ShuffleManager 的源代码, 代码如下。
private[spark] trait ShuffleManager {
/**
* 向manager注册shuffle并获取一个句柄,以将其传递给tasks。
*/
def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** 获取给定分区的writer。 被map任务的executors调用*/
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
/**
* 获取reduce分区范围的reader(包括startPartition到endPartition-1,包括首尾)。被resuce任务的executor调用。
*/
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
/**
* 获取reduce分区范围的reader(从startPartition到endPartition-1,包括端点),
* 以便于从map输出(从startMapIndex到endMapIndex-1,包括端点)进行读取。被resuce任务的executor调用。
*/
def getReaderForRange[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
/**
* Remove a shuffle's metadata from the ShuffleManager.
* 从ShuffleManager中移除shuffle的元数据
* @return true if the metadata removed successfully, otherwise false.
*/
def unregisterShuffle(shuffleId: Int): Boolean
/**
* Return a resolver capable of retrieving shuffle block data based on block coordinates.
* 返回一个可以基于块坐标来获取Shuffle块数据的ShuffleBlockResolver
*/
def shuffleBlockResolver: ShuffleBlockResolver
/** Shut down this ShuffleManager. */
def stop(): Unit
}
4.2 ShuffleHandler源码解析
ShuffleHandle 比较简单, 其本身是一个抽象类,用于记录 Task与Shuffle 相关的一些元数据, 同时也可以作为不同具体Shuffle 实现机制的一种标志信息, 控制不同具体实现子类的选择等。 其类的构造如下所示:
abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}
通过查看源码中的类的继承关系,我们可以发现目前ShuffleHandle的直接继承子类只有BaseShuffleHandle,其具体实现如下所示:
private[spark] class BaseShuffleHandle[K, V, C](
shuffleId: Int,
val dependency: ShuffleDependency[K, V, C])
extends ShuffleHandle(shuffleId)
4.3. Shuffle Writer源码解析
继承ShuffleWriter 的每个具体子类会实现 write 接口, 给出任务在输出时的记录具体写的方法。关键代码如下所示:
private[spark] abstract class ShuffleWriter[K, V] {
/** 将记录序列写入此任务的输出*/
@throws[IOException]
def write(records: Iterator[Product2[K, V]]): Unit
/**关闭该writer,并传递的map是否完整 */
def stop(success: Boolean): Option[MapStatus]
}
下面我们看一个ShuffleWriter 的实现类SortShuffleWriter中的write方法的实现,关键代码如下所示:
override def write(records: Iterator[Product2[K, V]]): Unit = {
//创建一个外部排序器
sorter = if (dep.mapSideCombine) {
//判断是否Map端的合并
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
//在这种情况下,我们既不传递聚合器也不传递排序器给排序器,因为我们不在乎键是否在每个分区中排序。 如果正在运行的操作是sortByKey,则将在reduce端完成。
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
//在排序器中传入数据
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
//创建用于写map输出的实例对象
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions)
//将添加到此ExternalSorter中的所有数据写入mapOutputWriter,该输出将字节推送到某个任意后备存储。
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
val partitionLengths = mapOutputWriter.commitAllPartitions()
//保存ShuffleMapTask返回给scheduler的结果。 包括任务运行所在的块管理器地址,
// 以及每个reduce的输出大小,以便于传递到reduce任务。
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}
上述代码中,首先会创建一个外部排序器来对数据进行排序,在这个过程中会判断一下是否在map端聚合,如果需要聚合,则需要传入相应的聚合函数和排序函数。之后会创建map写入实例并且将排序后的数据写入到mapOutputWriter,该输出将字节推送到某个任意后备存储。
4.4. Shuffle Reader源码解析
继承ShuffleReader 的每个具体子类会实现read 接口, 计算时负责从上一阶段Stage 的输出数据中读取记录。如下列代码所示。
private[spark] trait ShuffleReader[K, C] {
/**阅读此reduce 任务的组合key-values*/
def read(): Iterator[Product2[K, C]]
/**
* 关闭该reader
*需要时可以实现
*/
// def stop(): Unit
}
从源代码中可以看出,ShuffleReader只有唯一的实现类BlockStoreShuffleReader类,下面我们看看该类中的read方法的具体实现。
override def read(): Iterator[Product2[K, C]] = {
//创建真正读取数据的ShuffleBlockFetcherIterator实例
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.blockStoreClient,
blockManager,
blocksByAddress,
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT_MEMORY),
readMetrics,
fetchContinuousBlocksInBatch).toCompletionIterator
//创建dependency序列化实例,用于反序列化
val serializerInstance = dep.serializer.newInstance()
//将获取的流数据通过序列化对象反序列化为key/value类型
val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}
//为每个读取的记录更新上下文任务度量。
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
//为了支持任务取消,必须在此处使用可中断的迭代器
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
//我们正在读取已合并的值
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
/**
* 我们不知道value的类型,但也不在乎-依赖*应该*确保与该聚合器兼容,它将把值的类型转换为组合类型C
* */
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}
// Sort the output if there is a sort ordering defined.
//在基于Sort的Shuffle实现过程中,默认仅仅是基于partitionID进行排序,在芬区的内部数据是没有排序的,因此添加了keyOrdering变量,提供是否需要针对分区内的数据进行排序的标识信息
val resultIter = dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data.
//创建外部排序器用于排序数据,为了减少内存开销
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}
resultIter match {
case _: InterruptibleIterator[Product2[K, C]] => resultIter
case _ =>
new InterruptibleIterator[Product2[K, C]](context, resultIter)
}
}
下面进一步解析数据读取的部分细节,首先是数据块获取,读取ShuffleBlockFetcherlterator类,在类的构造体中调用了initialize方法(构造体中的表达式会在构造实例时执行),该方法中会根据数据块所在位置(本地结点或远程结点)分别进行读取,其关键代码如下:
private[this] def initialize(): Unit = {
// Add a task completion callback (called in both success case and failure case) to cleanup.
//添加任务完成回调(在成功和失败情况下都调用)进行清理。
context.addTaskCompletionListener(onCompleteCallback)
// Partition blocks by the different fetch modes: local, host-local and remote blocks.
//通过不同的获取模式对块进行分区:本地,主机本地和远程块。
val remoteRequests = partitionBlocksByFetchMode()
// Add the remote requests into our queue in a random order
//将远程请求以随机顺序添加到我们的队列中
fetchRequests ++= Utils.randomize(remoteRequests)
assert ((0 == reqsInFlight) == (0 == bytesInFlight),
"expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight)
// Send out initial requests for blocks, up to our maxBytesInFlight
//发出对块的初始请求,直到我们的maxBytesInFlight
fetchUpToMaxBytes()
val numFetches = remoteRequests.size - fetchRequests.size
logInfo(s"Started $numFetches remote fetches in ${Utils.getUsedTimeNs(startTimeNs)}")
// Get Local Blocks
//获取本地数据块
fetchLocalBlocks()
logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}")
//如果有主机本地的数据则下载相应数据
if (hostLocalBlocks.nonEmpty) {
blockManager.hostLocalDirManager.foreach(fetchHostLocalBlocks)
}
}
与Hadoop 一样,Spark 计算框架也是基于数据本地性,即移动计算而非移动数据的原则,因 此在获取数据块时,也会考虑数据本地性,尽量从本地读取已有的数据块,然后再远程读取。另外, 数据块的本地性是通过ShuffleBlockFetcherlterator实例构建时所传入的位置信息 来判断的,而该 信息由MapOutputTracker实例的getMapSizesByExecutorld方法提供。可以参 考该方法的返回值类型查看相关的位置信息, 返回值类型为: Seq[(BlockManagerId, Seq[(BlockId, Long)])],其中BlockManagerld是BlockManager的唯一标识信息,Blockld是数据块的唯一信息,对应的Seq[ (BlockId , Long)]表示一组数据块标识ID及其数据块大小的元组信息。
4.5 ShuffleBlockResolver源码解析
ShuffleBlockResolver 的源代码如下所示,该类是一个特质。该特质的具体实现子类知道如何通过一个逻辑Shuflle块标识信息来获取一个块数据。具体实现可以使用文件或文件段来封装Sh咄1e的数据。这是获取Shuflle块数据时所使用的抽象接口,在BlockStore中使用。
trait ShuffleBlockResolver {
//ShuffleID
type ShuffleId = Int
/**
* 检索指定块的数据。
*
*当dirs参数为None时,请使用磁盘管理器的本地目录。 否则,请从指定目录中读取。
*如果该块的数据不可用,则抛出未指定的异常。
*/
def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): ManagedBuffer
def stop(): Unit
}
继承ShuffieBlockResolver的每个具体子类会实现getBlockData 接口, 给出具体的获取块数据的方法。
目前在ShuffleBlockResolver 的各个具体子类中, 除了给出获取数据的接口之外, 通常会提供如何解析块数据信息的接口, 即提供了写数据块时的物理块与逻辑块之间映射关系的解析方法。
5. Shuffle的注册
从Spark的Shuffle框架图中可以看出,当构建一个宽依赖的RDD时,会向ShuffleManager注册。之所以在构建宽依赖时注册, 其原因在于DAG调度器中的Stage是根据宽依赖划分的。下面我们看一下宽依赖(ShuffleDepedency)中与注册相关的源代码如下所示:
//ShuffleID,唯一标识,可以看到是通过rdd的上下文获取的,因此针对特定的RDD,每个shuffleID都是唯一的
val shuffleId: Int = _rdd.context.newShuffleId()
//获取ShuffleHandler实例,后续获取Shuffler读取器和写入器时需要
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)
//Shuffle数据清理器的设置?? _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
到这里Shuffle机制框架结构基本介绍完毕了,关于其中的基于sort的Shuffle实现代码将在后续的博客中介绍。
参考书籍及博客:
《Spark内核机制解析及性能调优》
https://www.cnblogs.com/itboys/p/9226479.html
https://www.jianshu.com/p/500e8976642f
如果喜欢的话希望点赞收藏,关注我,将不间断更新博客。
希望热爱技术的小伙伴私聊,一起学习进步
来自于热爱编程的小白
转载:https://blog.csdn.net/qq_16669583/article/details/106142919