热门IT资讯网

第12课:Spark Streaming源码解读之Execu

发表于:2024-11-24 作者:热门IT资讯网编辑
编辑最后更新 2024年11月24日,Receiver接收到的数据交由ReceiverSupervisorImpl来管理。ReceiverSupervisorImpl接收到数据后,会数据存储并且将数据的元数据报告给ReceiverTrac

Receiver接收到的数据交由ReceiverSupervisorImpl来管理。

ReceiverSupervisorImpl接收到数据后,会数据存储并且将数据的元数据报告给ReceiverTracker 。

Executor的数据容错可以有三种方式:

  1. WAL日志

  2. 数据副本

  3. 接收receiver的数据流回放

/** Store block and report it to driver */def pushAndReportBlock(    receivedBlock: ReceivedBlock,    metadataOption: Option[Any],    blockIdOption: Option[StreamBlockId]  ) {  val blockId = blockIdOption.getOrElse(nextBlockId)  val time = System.currentTimeMillis  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")  val numRecords = blockStoreResult.numRecords  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))  logDebug(s"Reported block $blockId")}

数据的存储,是借助receiverBlockHandler,它的实现有两种方式:

privateval receivedBlockHandler: ReceivedBlockHandler = {  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {    if (checkpointDirOption.isEmpty) {      throw new SparkException(        "Cannot enable receiver write-ahead log without checkpoint directory set. " +          "Please use streamingContext.checkpoint() to set the checkpoint directory. " +          "See documentation for more details.")    }    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)  } else {    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)  }}


WriteAheadLogBaseBlockHandler 一方面将数据交由BlockManager管理,另一方面会写WAL日志。

一旦节点崩溃,可以由WAL日志恢复内存中的数据。在WAL开始时,就不在建议数据存储多个副本。

privateval effectiveStorageLevel = {  if (storageLevel.deserialized) {    logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +      s" write ahead log is enabled, change to serialization false")  }  if (storageLevel.replication > 1) {    logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +      s"write ahead log is enabled, change to replication 1")  }  StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)}


而BlockManagerBaseBlockHandler直接将数据交由BlockManager管理。

如果不写WAL,当节点崩溃了一定会数据丢失吗? 这个也不一定。因为在构建WriteAheadLogBaseBlockHandler,和BlockManagerBaseBlockHandler的时候会将receiver的storageLevel传入。storageLevel用来描述数据保存的地方(内存、磁盘)以及副本个数。

class StorageLevel private(    private var _useDisk: Boolean,    private var _useMemory: Boolean,    private var _useOffHeap: Boolean,    private var _deserialized: Boolean,    private var _replication: Int = 1)  extends Externalizable

公有如下种类的StorageLevel:

val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(false, false, true, false)


默认情况,数据采用MEMORY_AND_DISK_2,也就是说数据会产生两个副本,并且内存不足时会写入磁盘。


数据的最终存储是由BlockManager完成并管理的:

def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {  var numRecords = None: Option[Long]  val putResult: Seq[(BlockId, BlockStatus)] = block match {    case ArrayBufferBlock(arrayBuffer) =>      numRecords = Some(arrayBuffer.size.toLong)      blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,        tellMaster = true)    case IteratorBlock(iterator) =>      val countIterator = new CountingIterator(iterator)      val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,        tellMaster = true)      numRecords = countIterator.count      putResult    case ByteBufferBlock(byteBuffer) =>      blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)    case o =>      throw new SparkException(        s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")  }  if (!putResult.map { _._1 }.contains(blockId)) {    throw new SparkException(      s"Could not store $blockId to block manager with storage level $storageLevel")  }  BlockManagerBasedStoreResult(blockId, numRecords)}


对于从kafka中直接读取数据,可以通过记录数据offset的方法来进行容错。如果程序崩溃,下次启动时,从上次未处理数据的offset再次读取数据即可。



备注:

1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


0