当前位置 :首页 >> 社会

大数据开发之Spark Shuffle 原理量化

2024-11-06   来源 : 社会

p._1

val dep = rddAndDep._2

// While we use the old shuffle fetch protocol, we use partitionId as mapId in the

// ShuffleBlockId construction.

val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {

partitionId

} else context.taskAttemptId()

dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)

为了让ShuffleWriter

在ShuffleWriteProcessor的write分析方法中所首到时通过ShuffleManager借助writer最简单,然后再继续由附加的writer制订确切的write命题:

def write(

rdd: RDD[_],

dep: ShuffleDependency[_, _, _],

mapId: Long,

context: TaskContext,

partition: Partition): MapStatus = {

var writer: ShuffleWriter[Any, Any] = null

try {

val manager = SparkEnv.get.shuffleManager

writer = manager.getWriter[Any, Any](

dep.shuffleHandle,

mapId,

context,

createMetricsReporter(context))

writer.write(

rdd.iterator(partition, context).asInstanceOf[Iterator[_

Spark根据ShuffleHandle的各有不同换用附加的ShuffleWriter的动手到,有数:UnsafeShuffleWriter、BypassMergeSortShuffleWriter和SortShuffleWriter三种:

override def getWriter[K, V](

handle: ShuffleHandle,

mapId: Long,

context: TaskContext,

metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {

val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(

handle.shuffleId, _ => new OpenHashSet[Long](16))

mapTaskIds.synchronized { mapTaskIds.add(mapId) }

val env = SparkEnv.get

handle match {

case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>

new UnsafeShuffleWriter(

case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>

new BypassMergeSortShuffleWriter(

case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>

new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)

}

而确切的ShuffleHandle的为了让是根据shuffle黎曼实际上的partition多达、确实须要制订到时后顺序排列或者生成等持续性来确定的:

override def registerShuffle[K, V, C](

shuffleId: Int,

dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {

if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {

new BypassMergeSortShuffleHandle[K, V](

shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])

} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {

new SerializedShuffleHandle[K, V](

shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])

} else {

new BaseShuffleHandle(shuffleId, dependency)

}

}

下面分别参阅这三种ShuffleHandle的为了让命题:

1、BypassMergeSortShuffleHandle

BypassMergeSortShuffleHandle完全相近BypassMergeSortShuffleWriter,当不须要动手map端的生成,并且北区多达高于SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD(可选200)时换用这种方式为,可以上到在明文系统中所到时后顺序排列和生成的流程:

if (dep.mapSideCombine) {

false

} else {

val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)

dep.partitioner.numPartitions

}

BypassMergeSortShuffleWriter不须要将shuffle纪录到时写成入明文系统文件系统构件中所,而是根据样本的key取值得不到reduce北区,并创建者完全相近的DiskBlockObjectWriter都可将样本纪录从外部写成入到各北区完全相近的临时明文中所;【关注尚硅谷,精彩学IT】最后再继续将各有不同北区的临时明文拆分生产data和index明文即可。

2、SerializedShuffleHandle,该方式为用于了tungsten基于明文系统压缩的机制,缓解shuffle流程中所的明文系统冲击从而动手到shuffle更快。

换用该方式为须要满足三个前提:

if (!dependency.serializer.supportsRelocationOfSerializedObjects) {

log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +

s"${dependency.serializer.getClass.getName}, does not support object relocation")

false

} else if (dependency.mapSideCombine) {

log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +

s"map-side aggregation")

false

} else if (numPartitions> MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {

log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +

s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")

false

} else {

log.debug(s"Can use serialized shuffle for shuffle $shufId")

true

}

1)dependency的元数据容器赞同relocation

如果应用程序程序中所换用DataFrame、DataSet样本模型则等底层用于SparkSQL文件系统,当出现shuffle的持续性下,优化容器在订立物理学计划才会框架ShuffleExchangeExec路由器,并换用UnsafeRowSerializer,该元数据容器的supportsRelocationOfSerializedObjects属性为true,即赞同对元数据都可进行时到时后顺序排列;另外,如果应用程序以外用于KryoSerializer元数据容器或者纪录的key和value为原生样本类别或者string类别也换用KryoSerializer元数据容器,此时upportsRelocationOfSerializedObjects属性为true;否则用于可选的JaSerializer,该属性的取值为false。

2)不须要制订map端拆分:

如果换用非生成类黎曼例如join相关黎曼时完全相近dependency的mapSideCombine属性取值为false;如果换用生成类黎曼如reduceByKey、aggregateByKey、combineByKey等mapSideCombine属性为true;注意制订groupByKey黎曼时该属性也为false:

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {

val bufs = combineByKeyWithClassTag[CompactBuffer[V]](

createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)

3)shuffledependency的partition多达高于MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE,即16777215。

3、如果未能用于前两种ShuffleHandle,则换用BaseShuffleHandle,完全相近ShuffleWriter为SortShuffleWriter。

综合以上分析,Spark根据ShuffleHandle的各有不同而为了让附加的ShuffleWriter的动手到,年中我们来详细阐述这三种ShuffleWriter中所较为十分相似的动手到方式为SortShuffleWriter的制订物理现象,在全面性的Spark明文系统管理的发表文章中所我们将对UnsafeShuffleWriter以明文系统的取向进行时阐述;而BypassMergeSortShuffleWriter则是SortShuffleWriter的特殊持续性,即上到了map到时后顺序排列和生成外。

SortShuffleWriter

SortShuffleWriter通过insertAll分析方法首到时将参与shuffle的样本写成入到shuffle文件系统以下中所,当文件系统以下的空间内增大到未能继续写成入MLT-将样本溢写成到闪存中所。

Shuffle文件系统以下的动手到有两种样本构件:如果参与shuffle的黎曼须要动手生成则将样本纪录写成入到样本构件PartitionedAppendOnlyMap中所,该构件是一个HashMap,key为partitionId和纪录的key取值,并且每解决问题一个纪录均才会更新完全相近的key的value取值;如果黎曼不须要动手生成则换用PartitionedPairBuffer的样本构件,并将纪录的key和value到时后顺序插入到buffer多达第三组中所:

if (shouldCombine) {

// Combine values in-memory first using our AppendOnlyMap

val mergeValue = aggregator.get.mergeValue

val createCombiner = aggregator.get.createCombiner

var kv: Product2[K, V] = null

val update = (hadValue: Boolean, oldValue: C) => {

if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)

}

while (records.hasNext) {

addElementsRead()

kv = records.next()

map.changeValue((getPartition(kv._1), kv._1), update) //更新hashmap中所的value取值

maybeSpillCollection(usingMap = true)

}

} else {

// Stick values into our buffer

while (records.hasNext) {

addElementsRead()

val kv = records.next()

buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) //将纪录到时后顺序插入到buffer中所

maybeSpillCollection(usingMap = false)

}

}

由此可见,换用PartitionedAppendOnlyMap这种样本构件可以节约明文系统空间内、减少闪存溢写成以及shuffle拉取的的网络开支,这也是reduceByKey比groupByKey可靠性更非常容易的原因;同时,也是为什么shuffle在没有生成的持续性下须要换用tungsten的存储器方式为来进一步急剧提高制订可靠性。

每次写成入纪录不久都才会辨别确实须要将明文系统中所的样本进行时溢写成,主要的辨别命题是当shuffle文件系统的样本量超越近期的阈取值不久到时前适配shuffle文件系统以下,当适配不久的空间内仍然不足的持续性下则开始制订溢写成命题:

if (elementsRead % 32 == 0 && currentMemory>= myMemoryThreshold) {

// Claim up to double our current memory from the shuffle memory pool

val amountToRequest = 2 * currentMemory - myMemoryThreshold

val granted = acquireMemory(amountToRequest)

myMemoryThreshold += granted

// If we were granted too little memory to grow further (either tryToAcquire returned 0,

// or we already had more memory than myMemoryThreshold), spill the current collection

shouldSpill = currentMemory>= myMemoryThreshold

}

shouldSpill = shouldSpill || _elementsRead> numElementsForceSpillThreshold

// Actually spill

if (shouldSpill) {

_spillCount += 1

logSpillage(currentMemory)

spill(collection)

_elementsRead = 0

_memoryBytesSpilled += currentMemory

releaseMemory()

}

到时后顺序排列:

如果在shuffle依赖中所以外了到时后顺序排列的到时后顺序或者生成算法则表述到时后顺序排列函多达keyComparator:

private def comparator: Option[Comparator[K]] = {

if (ordering.isDefined || aggregator.isDefined) {

Some(keyComparator)

} else {

None

}

}

在具备到时后顺序排列函多达的持续性下,PartitionedAppendOnlyMap和PartitionedPairBuffer分别动手到了partitionedDestructiveSortedIterator函多达,对样本纪录首到时根据北区到时后顺序排列,然后再继续根据key进行时到时后顺序排列:

/**

* A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.

*/

def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] =

(a: (Int, K), b: (Int, K)) => {

val partitionDiff = a._1 - b._1

if (partitionDiff != 0) {

partitionDiff

} else {

keyComparator.compare(a._2, b._2)

}

}

闪存刷写成:

通过上去的流程将须要溢写成的样本在明文系统中所到时后顺序排列并封装到一个给定容器都可inMemoryIterator中所,然后再继续通过ExternalSorter子程序spillMemoryIteratorToDisk分析方法将到时后顺序排列后的样本写成到IO输出关键点,当超越输出关键点的容量上限(固定式项:spark.shuffle.file.buffer,可选32K) 或者纪录的个多达超过SHUFFLE_SPILL_BATCH_SIZE的取值(固定式项:spark.shuffle.spill.batchSize,可选10000),则将样本flush到闪存。因此如果一个作业shuffle溢写成的样本量相当大,可以前提调大相关固定式参多达从而降低闪存IO的可靠性开支:

val (blockId, file) = diskBlockManager.createTempShuffleBlock() //创建者临时溢写成明文,并为其生成blockId

val writer: DiskBlockObjectWriter =

blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) //框架fileBufferSize大小的关键点writer

try {

while (inMemoryIterator.hasNext) {

val partitionId = inMemoryIterator.nextPartition()

require(partitionId>= 0 && partitionId

s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")

inMemoryIterator.writeNext(writer) //将都可的键取值对写成到关键点

elementsPerPartition(partitionId) += 1

objectsWritten += 1

if (objectsWritten == serializerBatchSize) { //当都可多达超越上限则flush到闪存中所

flush()

}

将明文系统中所的纪录到时进行时到时后顺序排列并刷写成到临时闪存明文不久,再继续将该明文追加到spills以下中所,spills以下是一个ArrayBuffer[SpilledFile]的样本构件,对此一个task所有的待拆分明文的论域:

val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)

spills += spillFile

拆分溢写成明文:

当所有须要shuffle的纪录均解决问题完成并溢写成不久,ExternalSorter针对每个map北区子程序writePartitionedMapOutput分析方法将溢写成到闪存的临时明文和以及明文系统中所样本进行时都需到时后顺序排列,并写成入到一个data明文中所,确切动手到流程如下:

1.根据shuffleId、mapId创建者该北区溢写成明文拆分后的data明文,明文名为:

name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"

2.首到时针对每个reduce北区,结点所有的临时溢写成明文和明文系统中所的纪录,将属于该北区的纪录根据key取值进行时生成浮点;如果须要到时后顺序排列,则到时对纪录进行时都需到时后顺序排列再继续根据key取值动手生成;最后生成一个(partitionId,partitionId完全相近的纪录以下) 的二元第三组给定容器。

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])

: Iterator[(Int, Iterator[Product2[K, C]])] = {

val readers = spills.map(new SpillReader(_))

val inMemBuffered = inMemory.buffered

(0 until numPartitions).iterator.map { p =>

val inMemIterator = new IteratorForPartition(p, inMemBuffered)

val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)

if (aggregator.isDefined) {

// Perform partial aggregation across partitions

(p, mergeWithAggregation(

iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))

} else if (ordering.isDefined) {

// No aggregator given, but we he an ordering (e.g. used by reduce tasks in sortByKey);

// sort the elements without trying to merge them

(p, mergeSort(iterators, ordering.get))

} else {

(p, iterators.iterator.flatten)

}

}

}

3.结点第2步中所生成的二元第三组给定容器,依序为每个reduce partitionId创建者一个ShufflePartitionPairsWriter都可,并将partitionId完全相近的所有纪录的key和value取值写成入到在步骤1中所创建者的data明文中所:

for ((id, elements)

val blockId = ShuffleBlockId(shuffleId, mapId, id)

var partitionWriter: ShufflePartitionWriter = null

var partitionPairsWriter: ShufflePartitionPairsWriter = null

TryUtils.tryWithSafeFinally {

partitionWriter = mapOutputWriter.getPartitionWriter(id)

partitionPairsWriter = new ShufflePartitionPairsWriter(

partitionWriter,

serializerManager,

serInstance,

blockId,

context.taskMetrics().shuffleWriteMetrics)

if (elements.hasNext) {

for (elem

partitionPairsWriter.write(elem._1, elem._2)

}

}

}

须要说明的是,在将shuffle样本进行时拆分的流程还才会累计各个patition完全相近样本所占用的存储器空间内的大小,这些反馈换用partitionLengths多达第三组进行时纪录,partitionLengths多达第三组是一个下标为partitionId、取值为完全相近北区的样本阔度的长整型多达第三组。

框架资料库明文:

由于在创建者的data明文的流程中所还框架了partitionLengths多达第三组,就可以方便使用的发觉各北区的样本在data明文中所的x,以便于在reduce过渡阶段短时间检索data明文中所的样本,避免了大量shuffle明文的全量扫描,从而提高shuffle读过过渡阶段的解决问题可靠性。年中参阅为每个data明文框架资料库明文的流程:

1.在IndexShuffleBlockResolver类的writeIndexFileAndCommit分析方法中所,根据shuffleId、mapId即"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" 作为资料库明文的英文名称,并且该明文名也完全相近存储器系统中所的BlockId,然后通过完全相近executor的DiskBlockManager都可在localDir(一般是spark.local.dir固定式项)目录中所创建者一个index明文;

def getIndexFile(

shuffleId: Int,

mapId: Long,

dirs: Option[Array[String]] = None): File = {

val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)

dirs

.map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, blockId.name))

.getOrElse(blockManager.diskBlockManager.getFile(blockId))

}

2.根据partitionLengths多达第三组中所的length取值进行时都能累加计算,给予每个reduce task的样本在data明文中所的在在xoffset,并将其纪录在index明文中所,用于全面性短时间检索完全相近北区的shuffle样本:

var offset = 0L

out.writeLong(offset)

for (length

offset += length

out.writeLong(offset)

}

Reduce 过渡阶段的样本解决问题

在shuffle的Map过渡阶段也即shuffle write过渡阶段完成了样本的溢写成和拆分,年中投到入shuffle的Reduce过渡阶段也即shuffle read 过渡阶段。

我们发觉,所有RDD都才会制订其compute分析方法,在ShuffleRDD的compute分析方法中所才会初始化一个reader都可并子程序其read()分析方法并在动手到了如上命题:

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {

val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]

val metrics = context.taskMetrics().createTempShuffleReadMetrics()

SparkEnv.get.shuffleManager.getReader(

dep.shuffleHandle, split.index, split.index + 1, context, metrics)

.read()

.asInstanceOf[Iterator[(K, C)]]

}

初始化BlockStoreShuffleReader都可

在reduce过渡阶段,SortShuffleManager首到时通过MapOutputTracker根据shuffleId从mapStatuses中所借助blocksByAddress都可,该都可的样本构件为:Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],对此一个reduce北区所需要拉取的shuffle明文的存储器反馈(有数BlockManagerId以及BlockId、元组多达、mapId的论域);年中创建者BlockStoreShuffleReader都可用于读过取blocksByAddress中所所以外的shuffle明文:

override def getReader[K, C](

handle: ShuffleHandle,

startMapIndex: Int,

endMapIndex: Int,

startPartition: Int,

endPartition: Int,

context: TaskContext,

metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {

val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(

handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)

new BlockStoreShuffleReader(

handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,

shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))

}

其中所,BlockId为ShuffleBlockId的最简单,其编码方式为为"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId,block所占元组多达通过MapStatus借助:

for (part

val size = status.getSizeForBlock(part)

if (size != 0) {

splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=

((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex))

}

}

制订read()分析方法

BlockStoreShuffleReader通过制订其read()分析方法从本地借助或者通过的网络从其他路由器拉取shuffle样本,并对这些样本进行时进一步解决问题,年中我们来看一下read()分析方法的确切动手到:

1、借助shuffle map样本

在read()分析方法中所首到时框架了ShuffleBlockFetcherIterator最简单,并通过ShuffleBlockFetcherIterator的initialize()分析方法来动手到shuffle纪录的读过取:

1)子程序partitionBlocksByFetchMode分析方法根据shuffle明文的位置反馈划分为各有不同的配置文件:

根据blocksByAddress中所携带的shuffle明文的地址反馈,如果blockManager完全相近的executor与近期reduce 护航的executor保持一致,则将该的blockManager完全相近的shuffle明文存储器反馈放入localBlocks以下中所;否则,如果blockManager所在的路由器与近期reduce 护航的路由器保持一致,则将该blockManager完全相近的shuffle明文存储器反馈抽出hostLocalBlocks以下中所;否则shuffle明文反馈发挥作用于远程路由器中所,将完全相近的shuffle明文存储器反馈抽出fetchRequests配置文件中所:

for ((address, blockInfos)

//blockManager完全相近的executor与近期reduce 护航的executor保持一致

if (Seq(blockManager.blockManagerId.executorId, fallback).contains(address.executorId)) {

checkBlockSizes(blockInfos)

val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(

blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch)

numBlocksToFetch += mergedBlockInfos.size

localBlocks ++= mergedBlockInfos.map(info => (info.blockId, info.mapIndex))

localBlockBytes += mergedBlockInfos.map(_.size).sum

} else if (blockManager.hostLocalDirManager.isDefined &&

address.host == blockManager.blockManagerId.host) { //blockManager所在的路由器与近期reduce 护航的路由器保持一致

checkBlockSizes(blockInfos)

val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(

blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch)

numBlocksToFetch += mergedBlockInfos.size

val blocksForAddress =

mergedBlockInfos.map(info => (info.blockId, info.size, info.mapIndex))

hostLocalBlocksByExecutor += address -> blocksForAddress

hostLocalBlocks ++= blocksForAddress.map(info => (info._1, info._3))

hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum

} else { //否则shuffle明文反馈发挥作用于远程路由器中所

remoteBlockBytes += blockInfos.map(_._2).sum

collectFetchRequests(address, blockInfos, collectedRemoteRequests)

}

取例外的是,从远端拉取样本的持续性下如果样本量太大非常容易导致的网络阻塞,因此spark中所通过targetRemoteRequestSize 来限制reduce task每次远程拉取的样本量,如果超过该阈取值则将近期的block封装为一个FetchRequest并放在到collectedRemoteRequests以下中所作为全面性样本拉取的一个基本单元:

if (curRequestSize>= targetRemoteRequestSize || mayExceedsMaxBlocks) {

curBlocks = createFetchRequests(curBlocks, address, isLast = false,

collectedRemoteRequests)

private def createFetchRequests(

curBlocks: Seq[FetchBlockInfo],

address: BlockManagerId,

isLast: Boolean,

collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = {

val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks, doBatchFetch)

numBlocksToFetch += mergedBlocks.size

var retBlocks = Seq.empty[FetchBlockInfo]

if (mergedBlocks.length

collectedRemoteRequests += createFetchRequest(mergedBlocks, address)

} else {

mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>

if (blocks.length == maxBlocksInFlightPerAddress || isLast) {

collectedRemoteRequests += createFetchRequest(blocks, address)

其中所,targetRemoteRequestSize 的取值为 math.max(maxBytesInFlight / 5, 1L),maxBytesInFlight 通过固定式项SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024(可选48M)来以外;

如果的网络带最宽处不是瓶颈并且须要拉取的shuffle样本量相当大,则可以前提调大REDUCER_MAX_SIZE_IN_FLIGHT即固定式项spark.reducer.maxSizeInFlight的取值,反之亦然。

2)上去的步骤根据shuffle明文存储器的位置各有不同得不到了三个催促以下,年中分别借助各个以下中所的样本:

// Send out initial requests for blocks, up to our maxBytesInFlight

fetchUpToMaxBytes() //跨路由器拉取样本

// Get Local Blocks

fetchLocalBlocks()

if (hostLocalBlocks.nonEmpty) {

blockManager.hostLocalDirManager.foreach(fetchHostLocalBlocks)

}

以借助远端的shuffle明文拉取为例,ShuffleBlockFetcherIterator结点fetchRequests,首到时得不到各个request完全相近的blockManager,然后向该blockManager发送样本拉取催促:

while (isRemoteBlockFetchable(fetchRequests)) {

val request = fetchRequests.dequeue()

val remoteAddress = request.address

send(remoteAddress, request)

}

}

实际上的催促发送是在NettyBlockTransferService的fetchBlocks分析方法中所动手到的,首到时创建者TransportClient最简单,然后由OneForOneBlockFetcher根据TransportClient最简单、appId、executorId等向样本所在的BlockManager发送拉取消息FetchShuffleBlocks并解决问题返回的结果:

@Override

public void fetchBlocks(

String host,

int port,

String execId,

String[] blockIds,

BlockFetchingListener listener,

DownloadFileManager downloadFileManager) {

RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =

(inputBlockId, inputListener) -> {

// Unless this client is closed.

if (clientFactory != null) {

TransportClient client = clientFactory.createClient(host, port, maxRetries> 0);

new OneForOneBlockFetcher(client, appId, execId,

inputBlockId, inputListener, conf, downloadFileManager).start();

当完全相近BlockManager的NettyBlockRpcServer接收到FetchShuffleBlocks消息后,则根据ShuffleBlockId子程序BlockManager的getLocalBlockData分析方法从本地的shuffle明文中所读过取所需要的样本:

case fetchShuffleBlocks: FetchShuffleBlocks =>

val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case (mapId, index) =>

if (!fetchShuffleBlocks.batchFetchEnabled) {

fetchShuffleBlocks.reduceIds(index).map { reduceId =>

blockManager.getLocalBlockData(

ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))

}

取例外的是,在getLocalBlockData分析方法的动手到预定义中所我们碰到了上去提到的IndexShuffleBlockResolver的最简单:

override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {

if (blockId.isShuffle) {

logDebug(s"Getting local shuffle block ${blockId}")

try {

shuffleManager.shuffleBlockResolver.getBlockData(blockId)

}

由于IndexShuffleBlockResolver都可在shuffle map过渡阶段明文拆分的流程中所创建者了index明文,在reduce过渡阶段就可以根据shuffleId、mapId等反馈得不到确切的index明文,然后根据reduceId借助完全相近北区的样本阔度取值在index明文中所的x,短时间地从data明文中所定位到完全相近partition的样本:

override def getBlockData(

blockId: BlockId,

dirs: Option[Array[String]]): ManagedBuffer = {

val (shuffleId, mapId, startReduceId, endReduceId) = blockId match {

case id: ShuffleBlockId =>

(id.shuffleId, id.mapId, id.reduceId, id.reduceId + 1)

val indexFile = getIndexFile(shuffleId, mapId, dirs)

val channel = Files.newByteChannel(indexFile.toPath)

channel.position(startReduceId * 8L)

val in = new DataInputStream(Channels.newInputStream(channel))

try {

val startOffset = in.readLong()

channel.position(endReduceId * 8L)

val endOffset = in.readLong()

val actualPosition = channel.position()

val expectedPosition = endReduceId * 8L + 8

if (actualPosition != expectedPosition) {

throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +

s"expected $expectedPosition but actual position was $actualPosition.")

}

new FileSegmentManagedBuffer(

transportConf,

getDataFile(shuffleId, mapId, dirs),

startOffset,

endOffset - startOffset)

}

2、制订生成

如果以外了生成函多达则子程序生成容器(Aggregator)的combine CombinersByKey分析方法在reduce端对样本进行时生成:

val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {

if (dep.mapSideCombine) {

// We are reading values that are already combined

val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]

dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)

} else {

// We don't know the value type, but also don't care -- the dependency *should*

// he made sure its compatible w/ this aggregator, which will convert the value

// type to the combined type C

val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]

dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)

}

}

3、制订到时后顺序排列

如果须要根据key动手到时后顺序排列(例如sortByKey黎曼),则子程序ExternalSorter的insertAll分析方法对样本进行时文件系统,当文件系统空间内未能适配MLT-到时在明文系统中所到时后顺序排列然后制订溢写成,这个流程和map过渡阶段insertAll分析方法类似,reduce过渡阶段的输出又可以作为下一个shuffle map过渡阶段或者是action的样本源:

// Sort the output if there is a sort ordering defined.

val resultIter = dep.keyOrdering match {

case Some(keyOrd: Ordering[K]) =>

// Create an ExternalSorter to sort the data.

val sorter =

new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)

sorter.insertAll(aggregatedIter)

context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)

context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)

context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)

// Use completion callback to stop sorter if task was finished/cancelled.

context.addTaskCompletionListener[Unit](_ => {

sorter.stop()

})

CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())

揭示

Shuffle的解决问题流程较为适合于,并且由于其到时后顺序排列、生成、闪存溢写成以及样本派送等流程必然才会诱发CPU、明文系统、闪存IO以及的网络通信等上都的可靠性开支,我们在实际上的业务范围技术开发中所须要尽量避免shuffle,或者通过样本屏蔽和中所间结果文件系统等方式为尽量减少shuffle带来的可靠性影响。

下面根据本文的章节,Shuffle的物理现象主要揭示如下:

1、Shuffle的诱发从根本上是由从父子RDD的北区容器确实保持一致决定的,北区容器各有不同则必然诱发护航之间的样本派送;

2、Shuffle的流程主要分为map和reduce两个过渡阶段,也即shuffle write和shuffle read过渡阶段:

在shuffle write过渡阶段,根据ShuffleHandle的各有不同,shuffle写成闪存的流程将换用各有不同的ShuffleWriter的动手到类,本文详细参阅了其中所最经典的动手到方式为SortShuffleWriter,该模式通过PartitionedAppendOnlyMap样本构件在map端将key取值相近的样本生成不久再继续进行时到时后顺序排列和溢写成,换用该构件可以减少样本纪录占用的明文系统空间内从而急剧提高shuffle的制订可靠性;BypassMergeSortShuffleWriter 则是上到了明文系统拆分和到时后顺序排列的流程,从外部将shuffle样本溢写成到完全相近北区的临时明文中所;而换用UnsafeShuffleWriter可以利用到Tungsten明文系统模式的时可,通过元组多达第三组来第三组织样本纪录,不仅减少了明文系统空间内的占用,而且急剧减少了样本都可的创建者从而降低JVM的列车运行冲击。

在shuffle read过渡阶段,首到时根据shuffleId从mapStatuses中所得不到完全相近的MapStatus以下,然后结合reduceId给予reduce护航完全相近的所有shuffle明文的存储器反馈,并根据明文所在的存储器位置将shuffle纪录分配到各有不同的以下中所并分别制订样本拉取;如果表述了拆分和到时后顺序排列算法则到时在明文系统中所进行时拆分和到时后顺序排列不久再继续溢写成到闪存中所,否则从外部将该北区的样本写成入完全相近的闪存明文中所,并作为下一个shuffle read过渡阶段或者action黎曼的输入。

原作者:焦媛

中选阅读过:

大样本技术开发 Spark 模块之SparkSQL

大样本技术开发之Spark参考书

大样本技术开发之Spark 基础参考书求学

「投到」大样本技术开发之Spark面谈八股文

合肥看白癜风的医院
长治白癜风医院哪家好
郑州哪里专业治白癜风
胃烧心吃什么药好
艾司奥美拉唑和金奥康哪个好
哪个牌子的血糖仪准
抑制胃酸可以吃金奥康奥美拉唑吗
自家用哪种测血糖仪好些
标签:数据原理
电影《爱的富硒白云》宜春首映 高评如潮

街水度假捐助者为首合拍。慈溪余杭四同年天华谊兄弟有限责任公司发行,山西菲尔幕入股有限责任公司促销。9同年17日,恐怖电影《亲爱的的福砷泉水》月底上映,泡福砷温泉街水、遇见极致真为亲爱的! ...

友情链接