热门IT资讯网

(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

发表于:2024-11-25 作者:热门IT资讯网编辑
编辑最后更新 2024年11月25日,本期内容:1、Spark Streaming资源动态分配2、Spark Streaming动态控制消费速率为什么需要动态?a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Stre

本期内容:

1、Spark Streaming资源动态分配

2、Spark Streaming动态控制消费速率

为什么需要动态?
a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。

b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。
Spark Streaming资源动态调整的时候会面临挑战:
Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。


Spark Streaming资源动态申请
1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。

// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {  logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")}_executorAllocationManager =if (dynamicAllocationEnabled) {Some(new ExecutorAllocationManager(this, listenerBus, _conf))  } else {    None  }_executorAllocationManager.foreach(_.start())

设置spark.dynamicAllocation.enabled参数为true


这里会通过实例化ExecutorAllocationManager对象来动态分配资源,其内部是有定时器会不断的去扫描Executor的情况,通过线程池的方式调用schedule()来完成资源动态分配。

/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = {  listenerBus.addListener(listener)val scheduleTask = new Runnable() {override def run(): Unit = {try {        schedule() //动态调整Executor分配数量      } catch {case ct: ControlThrowable =>throw ctcase t: Throwable =>          logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)      }    }  }executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)}


private def schedule(): Unit = synchronized {val now = clock.getTimeMillis  updateAndSyncNumExecutorsTarget(now) //更新Executor数量removeTimes.retain { case (executorId, expireTime) =>val expired = now >= expireTimeif (expired) {initializing = falseremoveExecutor(executorId)    }    !expired  }}
/** * Updates our target number of executors and syncs the result with the cluster manager. * * Check to see whether our existing allocation and the requests we've made previously exceed our * current needs. If so, truncate our target and let the cluster manager know so that it can * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. * * @return the delta in the target number of executors. */private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {val maxNeeded = maxNumExecutorsNeededif (initializing) {// Do not change our target while we are still initializing,    // Otherwise the first job may have to ramp up unnecessarily0} else if (maxNeeded < numExecutorsTarget) {// The target number exceeds the number we actually need, so stop adding new    // executors and inform the cluster manager to cancel the extra pending requestsval oldNumExecutorsTarget = numExecutorsTarget    numExecutorsTarget = math.max(maxNeeded, minNumExecutors)numExecutorsToAdd = 1// If the new target has not changed, avoid sending a message to the cluster managerif (numExecutorsTarget < oldNumExecutorsTarget) {      client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)      logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +s"$oldNumExecutorsTarget) because not all requested executors are actually needed")    }numExecutorsTarget - oldNumExecutorsTarget  } else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded)    logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS * 1000delta  } else {0}}

动态控制消费速率:
Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,spark.streaming.backpressure.enabled参数设置。

0