`
baishuo491
  • 浏览: 77251 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

spark源码分析--rdd和stage的生成(更新了一张图)

 
阅读更多
原创,转载请注明出处  http://baishuo491.iteye.com/blog/2019510 ,作者邮箱:vc_java@hotmail.com,新浪微博:爱看历史的码农--白硕 作者单位:亚信联创大数据平台部

从一个简单的例子,来看rdd的演化,和stage是如何生成的(过程灰常之复杂和抽象,请参考附件的图来理解)
object BaiWordCount2 {
  def main(args: Array[String]) {
    .....
    // Create the context
    val ssc = new SparkContext(args(0), "BaiWordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
    val lines = ssc.textFile(args(1))//基于hadoopRdd,创建了一个MapRdd[String] 
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x =>{ println("x:"+ x);(x, 1) } )//这回返回的是一个元组了
    val red = wordCounts.reduceByKey( (a,b)=>{a + b} )
    red.saveAsTextFile("/root/Desktop/out") 
  }
}



我们首先总结出每个操作生成的rdd,这是通过源码追踪得到的:
textFile:生成一个HadoopRDD 和 一个MapPedRDD[String]
flatMap:前面生成的MapPedRDD[String],调用flatMap函数,生成了FlatMaPPedRDD[String]
map:FlatMaPPedRDD[String],调用map函数,创建了一个创建了一个MapPartitionsRDD,再通过隐式转换,生成了PairRDD[String,Int](因为map操作,产生了一对值key和value)
reduceByKey:生成三个Rdd,首先根据之前的PairRDD,生成一个MapPartitionsRDD(这个RDD起到类似map-reduce里面,combine()的作用),再生成一个shuffledRdd(这个rdd是分割stage的重要依据之一),这之后再生成一个MapPartitionsRDD[String](这个rdd起到hadoop里reducer的作用)
saveAsTextFile先生成一个MapPedRDD,然后调用runJob函数,将之前生成的rdd链表,提交到spark集群上,spark根局rdd的类型,划分成一个或多个stage(只有shuffledRdd这类的rdd,才会成为stage和stage之间的边界),然后将各个stage,按照依赖的先后顺序,将stage先后提交集群进行计算
下边通过textFile来详细说明rdd链表的生成过程和主要数据结构,主要注意deps和几种dependency:
从rdd生成的方式来说可以分成四类:通过外部数据生成rdd,通过transformations函数生成,缓存操作,actions操作
textFile函数无疑属于第一种,通过外部数据,生成rdd,输入的文件路径,可以是hdfs://开头的hdfs数据,也可以本地文件路径,例如"/root/Desktop/word.text"
textFile函数调用hadoopFile 函数,生成一个hadoopRdd[K,V],默认情况下,泛型参数K和V,对应HadoopRDD的构造函数里的keyClass和valueClass。
也就是一个Rdd[LongWritable,Text],通过外部数据生成rdd的第一个rdd的特点是,deps是一个空的list,原因是它是从外部文件生成的,没有父rdd
生成了Rdd[LongWritable,Text]后,还要调用transformations函数map:map(pair => pair._2.toString),来生成一个MappedRDD
MappedRDD(this, sc.clean(f)),这里this,就是之前生成的HadoopRDD,MappedRDD的构造函数,会调用父类的构造函数RDD[U](prev)
这个this(也就是hadoopRdd),会被赋值给prev,然后调用RDD.scala中,下面的构造函数
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent)))

这个函数的作用,是把父RDD的SparkContext(oneParent.context),和一个列表List(new OneToOneDependency(oneParent))),传入了另一个RDD的构造函数,
 RDD[T: ClassManifest](
    @transient private var sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  )

这样我们可以看到,在所有有父子关系的RDD,共享的是同一个SparkContext。而子RDD的deps变量,也被赋值为一个List,里面包含一个OneToOneDependency实例,表明父RDD和子RDD之间的关系
其实大多数的父子关系,包含的都是OneToOneDependency.比较例外的几个,比如join,这个很明显,他的数据不是来自同一个父RDD。而shuffledRdd的Dependency是ShuffledDependency

父Rdd会在子rdd的构造函数中被传入,然后放入子rdd实例的deps里面,被记录下来。这样,当我们得到一个Rdd之后,就可以向后回溯它的祖先,再结合传入的函数变量f,完整的得到它的构造过程。
flatMap,map,reduceByKey,saveAsTextFile则按顺序创建各自的rdd,然后在deps中记录父rdd,同时根据rdd的类型,生成各自的不同类型的dependency。
在saveAsTextFile函数把整个计算任务提交到集群之前,所有的函数进行的操作,仅仅就是生成rdd链表而已。saveAsTextFile是action类型的操作,action的共同特点是,会调用RunJob一类的函数,调用Dagscheduler.runJob,将最后一个rdd(在我们这个例子里,就是saveAsTextFile生成的那个MappedRdd),提交到集群上。集群会以这个rdd为参数之一,生成一个stage,名叫finalStage(故名思意,这是最终的一个stage)。然后调用submitStage,将刚刚生成的finalStage提交到集群上。这个stage是否会被马上执行呢?不一定,因为程序会调用getMissingParentStages,进行寻找,是否有需要先进行提交的stage---这个过程可以这样类比,一个查询操作,在提交之后,要先检查是否有子查询,如果有,先执行子查询,然后在执行父查询,这里的原因很简单,父查询依赖于子查询的数据。同理,在stage执行的过程中,也要先查询,它是否需要其他stage的数据(其实之后一种数据,就是通过shuffle传过来的数据),如果有,那么这些stage,就是它的missingParentStage,它要等他的missingParentStage执行成功,然后通过shuffle机制把数据传给它,才能开始执行。这个过程的执行过程如下:从最后一个rdd起,查看它的dependency的类型,如果是
shuffledDependency,则创建一个ShuffleMapStage,否则,就向前遍历,依次递归,知道最前面的rdd为止
 private def getMissingParentStages(stage: Stage): List[Stage] = {
    .....
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {/
        visited += rdd
        if (getCacheLocs(rdd).contains(Nil)) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_,_] =>
               val mapStage = getShuffleMapStage(shufDep, stage.jobId)
              if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                visit(narrowDep.rdd)
            }
          }
        }
      }
    }
    visit(stage.rdd)
    missing.toList
  }

当getMissingParentStage(stage)的结果为空的时候,表明这个stage没有missingParentStage,或者它的missingParentStage已经都执行完了,则当前这个stage才能被成功的提交到集群去执行,否则,它就要等待,并重复调用getMissingParentStage,直到它的结果为空,才可以被提交。

提交执行的过程会另开一篇
  • 大小: 142 KB
分享到:
评论
7 楼 QIAOtinger 2014-10-05  
   
6 楼 tanzek 2014-04-05  
想请问楼主怎么调试源码呢?用idea的本地运行功能吗?
5 楼 lzh8189146 2014-04-02  
我只是个新生啊,关注你,多向你学习学习,别拉黑啊  (众神皆WO奴)
4 楼 baishuo491 2014-04-02  
lzh8189146 写道
原来楼主就是 爱看历史的码农 ,久仰久仰
晕,你的新浪微博是啥?
3 楼 lzh8189146 2014-04-02  
原来楼主就是 爱看历史的码农 ,久仰久仰
2 楼 baishuo491 2014-02-24  
wzgl2014 写道
textFile函数从外部数据生成HadoopRDD,转MapPedRDD的时候,外部数据并没有加载到内存吧,懒加载的方式?

是的,在saveasTextFile之前,都没有开始计算,仅仅是在进行构建计算的流程,每个rdd,都是一个计算单元,它们连接起来,构成一个完整的计算过程。saveasTextFile函数,把job提交到集群,这才根据之前构建的计算流程,开始真正从外部数据源读数据
1 楼 wzgl2014 2014-02-24  
textFile函数从外部数据生成HadoopRDD,转MapPedRDD的时候,外部数据并没有加载到内存吧,懒加载的方式?

相关推荐

    Spark学习--RDD编码

    当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一个不可变的分布式对象集合。每个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、...

    spark-kafka-rdd:使Kafka成为Spark平台数据源的scala库

    请注意,Spark-Kafka-RDD 从 Kafka 主题和分区中获取给定的偏移范围作为单个 RDD ( KafkaRDD ) 返回给 Spark 驱动程序,而不是生成 Spark 流框架所做的 Spark DStream 。特征Spark-Kafka-RDD 有几个有用的特性。 ...

    spark-rdd-APi

    内容根据spark rdd.scala和ParRDDFunctions.scala源码中rdd顺序整理,包含rdd功能解释。对熟悉spark rdd很有用

    过去三十年-RCT-DID-RDD-LE-ML-DSGE等方法的“高光时刻”路线图.docx

    过去三十年-RCT-DID-RDD-LE-ML-DSGE等方法的"高光时刻"路线图全文共6页,当前为第1页。过去三十年-RCT-DID-RDD-LE-ML-DSGE等方法的"高光时刻"路线图全文共6页,当前为第1页。过去三十年,RCT,DID,RDD,LE,ML,DSGE等...

    spark源码分析.pdf

    spark源码分析,RDD、Iterator、Job、DAG、Stage、Taskset、task等

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    spark3.0入门到精通

    │ 01-[了解]-Spark发展历史和特点介绍.mp4 │ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种模式演示.mp4 │ ...

    spark学习-RDD的实验

    spark学习-RDD的实验

    2015 Spark技术峰会-Spark SQL结构化数据分析-连城

    Databrciks工程师,Spark Committer,Spark SQL主要开发者之一的连城详细解读了“Spark SQL结构化数据分析”。他介绍了Spark1.3版本中的很多新特性。重点介绍了DataFrame。其从SchemaRDD演变而来,提供了更加高层...

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 ...Dstream join 操作和 RDD join 操作的区别 PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会

    Spark学习---统计文件单词出现次数

    上一节我们简单介绍了RDD中转化和执行操作的用法,本节将通过一个具体的示例来加深对RDD的认识。 一.需求 统计本地文件中单词出现次数 二.操作流程 1.读取外部文件创建JavaRDD; 2.通过flatMap转化操作切分字符...

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    Spark源码深度解读

    Spark源码解读迷你 RDD、Spark Submit、Job、Runtime、Scheduler、Spark Storage、Shuffle、Standlone算法、Spark On yarn。。。

    spark-project-20190620.zip

    20190620自学spark,实现用java调用spark rdd api,主要侧重于lambda方式调用,本源码下载后直接可运行,因使用maven,只用前需先安装maven并保持网络正常。初次使用比较耗时,需要下载依赖包

    Spark源码系列(二)RDD详解

    上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的input,知道input是啥吧,就是输入的数据。RDD的全名是ResilientDistributedDataset,意思是容错的分布式数据集,每一个RDD都会有5个...

    spark-textFile构建RDD的分区及compute计算策略

    spark-textFile构建RDD的分区及compute计算策略

    playing-with-spark-rdd:Apache Spark RDD示例

    玩火花rdd Apache Spark RDD示例示例,用于学习Spark RDD和DataSet API。

    spark-RDD的特性介绍及源码阅读必备基础

    spark-RDD的特性介绍及源码阅读必备基础

    spark-rdd-sample1

    spark-rdd-sample1

    Spark-RDD.md

    Spark_RDD

Global site tag (gtag.js) - Google Analytics