从源码剖析一个Spark WordCount Job执行的全过程

WordCount可以说是分布式数据处理框架的”Hello World”,我们可以以它为例来剖析一个Spark Job的执行全过程。

我们要执行的代码为:

1
sc.textFile("hdfs://...").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect

只有一行,很简单也很经典的代码。这里的collect作为一个action,将触发一个Job,现在我们从源码开始剖析这个Job执行的全部过程。我这次读的源码是Spark 1.4.1的release版本。

为了方便描述,我们把上面的代码先进行一下拆分,这样可以清晰的看到每一步生成的RDD及其依赖关系,并方便下面分析时进行引用:

1
2
3
4
5
val hadoopRDD0 = sc.textFile("hdfs://...") // HadoopRDD[0]
val mapPartitionsRDD1 = hadoopRDD0.flatMap(_.split(" ")) // MapPartitionsRDD[2]
val mapPartitionsRDD2 = mapPartitionsRDD1.map((_, 1)) // MapPartitionsRDD[2]
val shuffledRDD3 = mapPartitionsRDD2.reduceByKey(_+_) // ShuffledRDD[3]
shuffledRDD3.collect // action

collect触发Job

首先,collect调用了SparkContext上的runJob方法。这个方法是一个阻塞方法,会在Job完成之前一直阻塞等待,直到Job执行完成之后返回所得的结果:

RDD.collect

1
2
3
4
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

需要注意的是这里传入了一个函数,这个函数就是这个Job的要执行的任务。后面我们可以看到,它将会被包装并序列化后发送到要执行它的executor上,并在要处理的RDD上的每个分区上被调用执行。

DAGScheduler提交Job

SparkContext的runJob被调用之后,这个Job的信息被递传给了SparkContext持有的一个DAGScheduler上。DAGScheduler本身维护着一个消息队列,在收到这个Job之后,将给自己的消息队列发送一个JobSubmitted消息。这个消息中包含了新生成的一个JobId, 触发action的RDD,经过清理后的闭包函数,要处理的各个分区的在RDD中的索引,以及一些其他信息。

DAGScheduler的消息队列在收到JobSubmitted消息后,将触发调用handleJobSubmitted方法。在这个方法中,首先会根据这个触发action的RDD的依赖信息计算出这个Job的所有Stage。在这个WordCount中,我们是在reduceByKey生成的shuffledRDD3(其生成的过程涉及到通用的combineByKey方法,具体可以参考这篇文章)上触发的action,所以我们的ResultStage所对应的finalRDD就是shuffledRDD3,ResultStage所要执行的就是shuffledRDD3的所有分区。shuffledRDD3有一个ShuffleDependency,指向mapPartitionsRDD2,据此ShuffleDependency会生成一个ShuffleMapStage,它是ResultStage的父Stage。

根据继承关系分析Stages

在分析出所有的Stage之后,DAGScheduler会根据ResultStage创建出一个ActiveJob对象,用来表示这个活跃的Job。然后提交ResultStage,但是在真正执行这个Stage之前,先递归的判断它有没有父Stage,若有的话先提交它的父Stage,并将当前Stage加入等待队列;若没有父Stage,才会真正的开始执行这个Stage。等待队列中的Stage,会在父Stage都执行完成之后再被执行。

由此可以看出,在一个Job中,Stage之间必须按序执行,后一个Stage的执行将依赖前一个Stage的结果。一个Job只会有一个ResultStage,并且这个ResultStage一定会是整个Job的最后一个Stage,所以ResultStage执行的结束也就标志着整个Job的结束。

Task的创建和提交

按照之前的分析,我们的Job一共有两个Stage,一个ShuffleMapStage,一个ResultStage,并将先执行ShuffleMapStage。在执行Stage的时候,会按此Stage对应的RDD的分区数量,对应每一个分区创建一个Task。如果是ShuffleMapStage则创建ShuffleMapTask,如果是ResultStage则创建ResultTask。这些Task在后面将会被序列化后发到其他的executor上面去运行。

在这里分析一下每个Task包含哪些信息
两种Task都会包含的信息有 (1)当前Stage对应的RDD对象(轻量级) (2)当前Stage的ID (3)要处理的那个分区信息(轻量级),以及该任务可能的最优执行位置(例如,对于hdfs上的文件,HadoopRDD中会记录其每一个分区存储在集群的位置,并将这个位置通过依赖继承到其子RDD)

除此之外,ShuffleMapTask还包含了对应的ShuffleDependency的对象(这其中实际上有分区的方法,数据合并的方法等计算时所需的信息);ResultTask还包含了当前这个Job最终要执行在每个数据上的函数(在此情况下就是collect传给SparkContext的那个函数)。

在对每个要处理的分区创建出各个Task之后,DAGScheduler会将同一个Stage的各个Task合并成一个TaskSet,并将其提交给TaskScheduler。至此,调度这些Task的工作就交给了TaskScheduler来进行。

TaskScheduler在收到这个TaskSet之后,首先为其创建一个TaskSetManager,这个TaskSetManager将辅助任务的调度。然后TaskScheduler将会调用SchedulerBackend上的reviveOffers方法去申请可用的资源。

SchedulerBackend分配资源(executors)和发送Task

SchedulerBackend是一个接口,它在不同的部署模式下会有不同的实现(实际上TaskScheduler也是这样)。SchedulerBackend的作用是调度和控制整个集群里面的资源(我是这么理解的,这里的资源指的是可用的executors),当reviveOffers方法被调用后,它会将当前可用的所有资源信息,通过调用TaskScheduler的resourceOffers提供给TaskScheduler(实际上这个过程是通过另一个EndPoint类以消息队列的方式实现的,这样可以保证同时只会进行一个对资源的申请或释放过程)。

TaskScheduler在收到当前所有可用的资源信息后,会将这些资源信息按序提供给当前正在执行的多个TaskSet,每个TaskSet再根据这些资源信息将当前可以执行的Task序列化后包装到一个TaskDescription对象中返回(这个TaskDescription对象中也包含了这个任务将要运行在哪个executor上),最终通过TaskScheduler将所有当前的资源情况可以执行的Task对应的TaskDescription返回给SchedulerBackend。

SchedulerBackend这时才根据每个TaskDescription将executors资源真正的分配给这些Task,并记录已分配掉的资源和剩余的资源,然后将TaskDescription中序列化后的Task通过网络(Spark使用akka框架)发送给它对应的executor。

executor执行Task

集群中的executor在收到Task后,申请一个线程开始运行这个Task。这是整个Job中最核心的部分了,真正的计算都在这一步发生。首先将其反序列化,然后调用这个Task对象上的runTask方法。在这里对于ShuffleMapTask和ResultTask,runTask方法有着不同的实现,并将返回不同的内容。我们分别来分别分析。

对于ShuffleMapTask,runTask首先获取对应的RDD和ShuffleDependency。在这里对应的RDD是mapPartitionsRDD2,ShuffleDependency中则有着合并的计算信息。然后调用RDD的iterator方法获取一个对应分区数据的迭代器。如果当前RDD分区的数据已经在之前计算过了,则会直接去内存或磁盘中获取,否则在此时就会调用mapPartitionsRDD2的compute方法,根据其依赖去计算它的分区数据。如果ShuffleDependency中的mapSideCombine标记为true,就会将iterator方法返回的分区数据在这里(也就是map端)进行合并(此时要求ShuffleDependency中的aggregator不为空,aggregator中包含了如何将数据进行合并的信息)。然后根据ShuffleDependency中的partitioner(默认是一个HashPartitioner)计算出每条数据在其结果端(就是shuffleRDD3中)的分区,并将其写入到本地磁盘中对应的文件中去(在这里写入方法有多种实现方式,1.4.1的版本默认是用了SortShuffleManager,还有的其他实现是HashShuffleManager和UnsafeShuffleManager,具体的实现方法在此处就不详说了)。当分区的每条数据都处理完后,runTask会返回一个MapStatus,这其中包含了一个BlockManagerId(标记了这个任务被执行的位置,也就是Map后的数据存储的位置)以及每个结果分区(每个reduceId)的数据的大小信息。最后这个MapStatus将通过网络发回给driver,dirver将其记录。

ShuffleMapTask.runTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}

对于ResultTask,runTask首先也是获取对应的RDD和要在数据上执行的函数func。在这里对应的RDD应该是shuffleRDD3,然后调用RDD上的iterator获取这个分区的数据,并将其传入func函数中,将func函数的返回值作为runTask的返回值返回。过程看似简单,实际上在shuffleRDD3上调用iterator时就对应了shuffle的reduce端的合并。从shuffleRDD3的compute方法的实现可以看出,它的每个分区的数据都要去执行了ShuffleMapTask的executor上面获取,所以会产生大量的网络流量和磁盘IO。这个过程就是MapReduce范式中的shuffle过程,这里面还有很多的细节我并没有详述,但是这个过程十分关键,它的实现效率直接决定了分布式大数据处理的效率。

ResultTask.runTask

1
2
3
4
5
6
7
8
9
10
11
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}

executor返回结果

在runTask计算结束返回数据后,executor将其返回的数据进行序列化,然后根据序列化后数据的大小进行判断:如果数据大与某个值,就将其写入本地的内存或磁盘(如果内存不够),然后将数据的位置blockId和数据大小封装到一个IndirectTaskResult中,并将其序列化;如果数据不是很大,则直接将其封装入一个DirectTaskResult并进行序列化。最终将序列化后的DirectTaskResult或者IndirectTaskResult递传给executor上运行的一个ExecutorBackend上(通过statusUpdate方法)。

ExecutorBackend如上面的SchedulerBackend有着相似的功能(实际上,对于local模式,这两个类都由一个LocalBackend实现),将结果封入一个StatusUpdate消息透传给一个对应的EndPoint类,EndPoint类中收到这个消息后将该消息再通过网络发送给driver。

driver接收executor返回的结果并释放资源

在driver端的SchedulerBackend收到这个StatusUpdate消息之后,将结果续传给TaskScheduler,并进行资源的释放,在释放资源后再调用一次reviveOffers,这样又可以重复上面所描述的过程,将释放出来的资源安排给其他的Task来执行。

TaskResultGetter解析并拉取结果

TaskScheduler在收到任务结果后,将这个任务标记为结束,然后使用一个TaskResultGetter类来进行结果的解析。TaskResultGetter将结果反序列化,判断如果其是一个DirectTaskResult则直接抽取出其中的结果;如果是一个IndirectTaskResult则需要根据其中的blockId信息去对应的机器上拉取结果。最终都是将结果拉取到driver的内存中(这就是我们最好不要在大数据集上执行类似collect的方法的原因,它会将所有的数据拉入driver的内存中,造成大量的内存开销,甚至内存不足)。然后TaskResultGetter会将拉取到的结果递交给TaskScheduler,TaskScheduler再将此结果递交给DAGScheduler。

处理结果并在Job完成时返回

DAGScheduler在收到Task完成的消息后,先判断这完成的是一个什么任务。如果是一个ShuffleMapTask则需要将返回的结果(MapStatus)记录到driver中,并判断如果当前的ShuffleMapStage若是已经完成,则去提交下一个Stage。如果是一个ResultTask完成了, 则将其结果递交给JobWaiter,并标记这个任务以完成。

JobWaiter是DAGScheduler在最开始submitJob的时候创建的一个对象,用于阻塞等待任务的完成,并进行结果的处理。JobWaiter在每收到一个ResultTask的结果时,都将结果在resultHandler上执行。这个resultHandler则是由SparkContext传进来的一个函数,其作用是将数据放入一个数组中,这个数组最终将作为SparkContext.runJob方法的返回值,被最开始的collect方法接收然后返回。若JobWaiter收到了每个ResultTask的结果,则表示整个Job已经完成,此时就停止阻塞等待,于是SparkContext.runJob返回一个结果的数组,并由collect接收后返回给用户程序。

至此,一个Spark的WordCount执行结束。

总结

本文从源码的角度详细分析了一个Spark Job的整个执行、调度的过程,不过很多东西还只是浅尝辄止,并未完全深入。尽管如此,经过连续好几天的分析,我还是觉得收获颇丰,对Spark的实现原理有了更加深入的理解,甚至对MapReduce的编程范式以及其shuffle过程也增加了不少理解。PS:其实从一开始我到分析结束都是没有做任何记录的,只因为一直一知半解实在不知道如何来做记录,所以只是去查阅一些资料和使劲儿的阅读源码。在我自认为分析结束后,我才开始写这篇记录,但是在写的过程中我才发现我分析的过程有一些并不是很清晰,然后重新去看,才真正弄的比较清晰了。可见写博文是很重要的过程,不仅是将学到的知识分享出来,而且对自身的知识也有很好的加固作用。