三、spark--spark调度原理分析
[TOC]
一、wordcount程序的执行过程
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object WordCount { def main(args: Array[String]): Unit = { //创建spark配置文件对象.设置app名称,master地址,local表示为本地模式。 //如果是提交到集群中,通常不指定。因为可能在多个集群汇上跑,写死不方便 val conf = new SparkConf().setAppName("wordCount") //创建spark context对象 val sc = new SparkContext(conf) sc.textFile(args(0)).flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) .saveAsTextFile(args(1)) sc.stop() }}
核心代码很简单,首先看 textFile这个函数
SparkContext.scala def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() //指定文件路径、输入的格式类为textinputformat,输出的key类型为longwritable,输出的value类型为text //map(pair => pair._2.toString)取出前面的value,然后将value转为string类型 //最后将处理后的value返回成一个新的list,也就是RDD[String] //setName(path) 设置该file名字为路径 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }关键性的操作就是:返回了一个hadoopFile,它有几个参数:path:文件路径classOf[TextInputFormat]:这个其实就是输入文件的处理类,也就是我们mr中分析过的TextInputFormat,其实就是直接拿过来的用的,不要怀疑,就是酱紫的classOf[LongWritable], classOf[Text]:这两个其实可以猜到了,就是输入的key和value的类型。接着执行了一个map(pair => pair._2.toString),将KV中的value转为string类型
我们接着看看hadoopFile 这个方法
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. FileSystem.getLocal(hadoopConfiguration) // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) //看到这里,最后返回的是一个 HadoopRDD 对象 //指定sc对象,配置文件、输入方法类、KV类型、分区个数 new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
最后返回HadoopRDD对象。
接着就是flatMap(.split(" ")) .map((,1)),比较简单
flatMap(_.split(" ")) 就是将输入每一行,按照空格切割,然后切割后的元素称为一个新的数组。然后将每一行生成的数组合并成一个大数组。map((_,1))将每个元素进行1的计数,组成KV对,K是元素,V是1
接着看.reduceByKey(_+_)
这个其实就是将同一key的KV进行聚合分组,然后将同一key的value进行相加,最后就得出某个key对应的value,也就是某个单词的个数看看这个函数def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) } 这个过程中会分区,默认分区数是2,使用的是HashPartitioner进行分区,可以指定分区的最小个数
二、spark的资源调度
2.1 资源调度流程
图2.1 spark资源调度
1、执行提交命令,会在client客户端启动一个spark-submit进程(用来为Driver申请资源)。
2、为Driver向Master申请资源,在Master的waitingDrivers 集合中添加这个Driver要申请的信息。Master查看works集合,挑选出合适的Work节点。
3、在选中的Work节点中启动Driver进程(Driver进程已经启动了,spark-submit的使命已经完成了,关闭该进程)。所以其实driver也需要资源,也只是跑在executor上的一个线程而已
4、Driver进程为要运行的Application申请资源(这个资源指的是Executor进程)。此时Master的waitingApps 中要添加这个Application申请的资源信息。这时要根据申请资源的要求去计算查看需要用到哪些Worker节点(每一个节点要用多少资源)。在这些节点启动Executor进程。
(注:轮询启动Executor。Executor占用这个节点1G内存和这个Worker所能管理的所有的core)
5、此时Driver就可以分发任务到各个Worker节点的Executor进程中运行了。
Master中的三个集合
val works = new HashSet[WorkInfo]() works 集合采用HashSet数组存储work的节点信息,可以避免存放重复的work节点。为什么要避免重复?首先我们要知道work节点有可能因为某些原因挂掉,挂掉之后下一次与master通信时会报告给master,这个节点挂掉了,然后master会在works对象里把这个节点去掉,等下次再用到这个节点是时候,再加进来。这样来说,理论上是不会有重复的work节点的。可是有一种特殊情况:work挂掉了,在下一次通信前又自己启动了,这时works里面就会有重复的work信息。 val waitingDrivers = new ArrayBuffer[DriverInfo]() 当客户端向master为Driver申请资源时,会将要申请的Driver的相关信息封装到master节点的DriverInfo这个泛型里,然后添加到waitingDrivers 里。master会监控这个waitingDrivers 对象,当waitingDrivers集合中的元素不为空时,说明有客户端向master申请资源了。此时应该先查看一下works集合,找到符合要求的worker节点,启动Driver。当Driver启动成功后,会把这个申请信息从waitingDrivers 对象中移除。 val waitingApps = new ArrayBuffer[ApplicationInfo]() Driver启动成功后,会为application向master申请资源,这个申请信息封存到master节点的waitingApps 对象中。同样的,当waitingApps 集合不为空,说明有Driver向Master为当前的Application申请资源。此时查看workers集合,查找到合适的Worker节点启动Executor进程,默认的情况下每一个Worker只是为每一个Application启动一个Executor,这个Executor会使用1G内存和所有的core。启动Executor后把申请信息从waitingApps 对象中移除。 注意点:上面说到master会监控这三个集合,那么到底是怎么监控的呢??? master并不是分出来线程专门的对这三个集合进行监控,相对而言这样是比较浪费资源的。master实际上是'监控'这三个集合的改变,当这三个集合中的某一个集合发生变化时(新增或者删除),那么就会调用schedule()方法。schedule方法中封装了上面提到的处理逻辑。
2.2 application和executor的关系
1、默认情况下,每一个Worker只会为每一个Application启动一个Executor。每个Executor默认使用1G内存和这个Worker所能管理的所有的core。
2、如果想要在一个Worker上启动多个Executor,在提交Application的时候要指定Executor使用的core数量(避免使用该worker所有的core)。提交命令:spark-submit --executor-cores
3、默认情况下,Executor的启动方式是轮询启动,一定程度上有利于数据的本地化。
什么是轮询启动???为什么要轮训启动呢???
轮询启动:轮询启动就是一个个的启动。例如这里有5个人,每个人要发一个苹果+一个香蕉。轮询启动的分发思路就是:五个人先一人分一个苹果,分发完苹果再分发香蕉。
为什么要使用轮询启动的方式呢???我们做大数据计算首先肯定想的是计算找数据。在数据存放的地方直接计算,而不是把数据搬过来再计算。我们有n台Worker节点,如果只是在数据存放的节点计算。只用了几台Worker去计算,大部分的worker都是闲置的。这种方案肯定不可行。所以我们就使用轮询方式启动Executor,先在每一台节点都允许一个任务。
存放数据的节点由于不需要网络传输数据,所以肯定速度快,执行的task数量就会比较多。这样不会浪费集群资源,也可以在存放数据的节点进行计算,在一定程度上也有利于数据的本地化。
2.3 spark的粗细粒度调度
粗粒度(富二代):
在任务执行之前,会先将资源申请完毕,当所有的task执行完毕,才会释放这部分资源。优点:每一个task执行前。不需要自己去申请资源了,节省启动时间。缺点:等到所有的task执行完才会释放资源(也就是整个job执行完成),集群的资源就无法充分利用。这是spark使用的调度粒度,主要是为了让stage,job,task的执行效率高一点
细粒度(穷二代):
Application提交的时候,每一个task自己去申请资源,task申请到资源才会执行,执行完这个task会立刻释放资源。优点:每一个task执行完毕之后会立刻释放资源,有利于充分利用资源。缺点:由于需要每一个task自己去申请资源,导致task启动时间过长,进而导致stage、job、application启动时间延长。
2.4 spark-submit提交任务对资源的限制
我们提交任务时,可以指定一些资源限制的参数:
--executor-cores : 单个executor使用的core数量,不指定的话默认使用该worker所有能调用的core--executor-memory : 单个executor使用的内存大小,如1G。默认是1G--total-executor-cores : 整个application最多使用的core数量,防止独占整个集群资源
三、整个spark资源调度+任务调度的流程
3.1 总的调度流程
https://blog.csdn.net/qq_33247435/article/details/83653584#3Spark_51
一个application的调度到完成,需要经过以下阶段:
application-->资源调度-->任务调度(task)-->并行计算-->完成
图3.1 spark调度流程
可以看到,driver启动后,会有下面两个对象:
DAGScheduler:据RDD的宽窄依赖关系将DAG有向无环图切割成一个个的stage,将stage封装给另一个对象taskSet,taskSet=stage,然后将一个个的taskSet给taskScheduler。taskScheduler:taskSeheduler拿倒taskSet之后,会遍历这个taskSet,拿到每一个task,然后去调用HDFS上的方法,获取数据的位置,根据获得的数据位置分发task到响应的Worker节点的Executor进程中的线程池中执行。并且会根据每个task的执行情况监控,等到所有task执行完成后,就告诉master将所哟executor杀死
任务调度中主要涉涉及到以下流程:
1)、DAGScheduler:根据RDD的宽窄依赖关系将DAG有向无环图切割成一个个的stage,将stage封装给另一个对象taskSet,taskSet=stage,然后将一个个的taskSet给taskScheduler。2)、taskScheduler:taskSeheduler拿倒taskSet之后,会遍历这个taskSet,拿到每一个task,然后去调用HDFS上的方法,获取数据的位置,根据获得的数据位置分发task到响应的Worker节点的Executor进程中的线程池中执行。3)、taskScheduler:taskScheduler节点会跟踪每一个task的执行情况,若执行失败,TaskScher会尝试重新提交,默认会重试提交三次,如果重试三次依然失败,那么这个task所在的stage失败,此时TaskScheduler向DAGScheduler做汇报。4)DAGScheduler:接收到stage失败的请求后,,此时DAGSheduler会重新提交这个失败的stage,已经成功的stage不会重复提交,只会重试这个失败的stage。(注:如果DAGScheduler重试了四次依然失败,那么这个job就失败了,job不会重试
掉队任务的概念:
当所有的task中,75%以上的task都运行成功了,就会每隔一百秒计算一次,计算出目前所有未成功任务执行时间的中位数*1.5,凡是比这个时间长的task都是挣扎的task。
总的调度流程:
=======================================资源调度=========================================1、启动Master和备用Master(如果是高可用集群需要启动备用Master,否则没有备用Master)。2、启动Worker节点。Worker节点启动成功后会向Master注册。在works集合中添加自身信息。3、在客户端提交Application,启动spark-submit进程。伪代码:spark-submit --master --deploy-mode cluster --class jarPath4、Client向Master为Driver申请资源。申请信息到达Master后在Master的waitingDrivers集合中添加该Driver的申请信息。5、当waitingDrivers集合不为空,调用schedule()方法,Master查找works集合,在符合条件的Work节点启动Driver。启动Driver成功后,waitingDrivers集合中的该条申请信息移除。Client客户端的spark-submit进程关闭。(Driver启动成功后,会创建DAGScheduler对象和TaskSchedule对象)6、当TaskScheduler创建成功后,会向Master会Application申请资源。申请请求发送到Master端后会在waitingApps集合中添加该申请信息。7、当waitingApps集合中的元素发生改变,会调用schedule()方法。查找works集合,在符合要求的worker节点启动Executor进程。8、当Executor进程启动成功后会将waitingApps集合中的该申请信息移除。并且向TaskSchedule反向注册。此时TaskSchedule就有一批Executor的列表信息。=======================================任务调度=========================================9、根据RDD的宽窄依赖,切割job,划分stage。每一个stage是由一组task组成的。每一个task是一个pipleline计算模式。10、TaskScheduler会根据数据位置分发task。(taskScheduler是如何拿到数据位置的???TaskSchedule调用HDFS的api,拿到数据的block块以及block块的位置信息)11、TaskSchedule分发task并且监控task的执行情况。12、若task执行失败或者挣扎。会重试这个task。默认会重试三次。13、若重试三次依旧失败。会把这个task返回给DAGScheduler,DAGScheduler会重试这个失败的stage(只重试失败的这个stage)。默认重试四次。14、告诉master,将集群中的executor杀死,释放资源。