三,火花,火花调度原理分析

  (TOC)

  

一、wordcount程序的执行过程

  
 <代码类=" language-scala ">进口org.apache.spark.rdd.RDD
  进口org.apache.spark。{SparkConf, SparkContext}
  
  对象WordCount {
  def主要(args:数组[String]):单位={//创建火花配置文件对象。设置应用名称,主人地址,当地表示为本地模式。//如果是提交到集群中,通常不指定。因为可能在多个集群汇上跑,写死不方便
  配置=new SparkConf瓦尔().setAppName (“wordCount”)//创建火花上下文对象
  val sc=new SparkContext(配置)
  
  sc.textFile .flatMap (_ (args (0))。分割(" "))
  . map ((_, - 1))
  .reduceByKey (_ + _)
  .saveAsTextFile (args (1))
  
  sc.stop ()
  }
  } 
  

核心代码很简单,首先看文本文件这个函数

  
 <代码> SparkContext.scala
  
  def文本文件(
  路径:字符串,
  minPartitions: Int=defaultMinPartitions):抽样[String]=withScope {
  assertNotStopped ()//指定文件路径,输入的格式类为textinputformat,输出的关键类型为longwritable,输出的价值类型为文本//地图(一对=比;pair._2.toString)取出前面的价值,然后将转值为字符串类型//最后将处理后的价值返回成一个新的列表,也就是抽样(字符串)//setName(路径)设置该文件的名字为路径
  名为[TextInputFormat], classOf hadoopFile(路径,名为[LongWritable], classOf名为[文本],classOf
  minPartitions)。地图(一对=比;pair._2.toString) .setName(路径)
  }
  
  关键性的操作就是:
  返回了一个hadoopFile,它有几个参数:
  路径:文件路径
  名为[TextInputFormat]: classOf这个其实就是输入文件的处理类,也就是我先生们中分析过的TextInputFormat,其实就是直接拿过来的用的,不要怀疑,就是酱紫的
  名为[LongWritable], classOf名为[文本]:classOf这两个其实可以猜到了,就是输入的键和值的类型。
  
  接着执行了一个映射(一对=比;pair._2.toString),将KV中转的值为字符串类型 
  

我们接着看看hadoopFile这个方法

  
 <代码> def hadoopFile [K、V] (
  路径:字符串,
  inputFormatClass:类[_ & lt;: InputFormat [K、V]],
  keyClass:类[K],
  valueClass:类[V],
  minPartitions: Int=defaultMinPartitions):抽样((K、V))=withScope {
  assertNotStopped ()//这是一个黑客执行加载hdfs-site.xml。//看到火花- 11227的细节。
  FileSystem.getLocal (hadoopConfiguration)//Hadoop可以大约10 KB,配置相当大,所以广播。
  val confBroadcast=广播(新SerializableConfiguration (hadoopConfiguration))
  val setInputPathsFunc=(jobConf: jobConf)=比;FileInputFormat。setInputPaths (jobConf路径)//看到这里,最后返回的是一个HadoopRDD对象//指定sc对象,配置文件,输入方法类,KV类型,分区个数
  新HadoopRDD (
  这一点,
  confBroadcast,
  一些(setInputPathsFunc),
  inputFormatClass,
  keyClass,
  valueClass,
  minPartitions) .setName(路径)
  } 
  

最后返回HadoopRDD对象。

  

接着就是flatMap ( .split (““)). map (( 1)),比较简单

  
 <代码> flatMap (_。分割(" "))
  就是将输入每一行,按照空格切割,然后切割后的元素称为一个新的数组。
  然后将每一行生成的数组合并成一个大数组。
  
  地图((_,- 1))
  将每个元素进行1的计数,组成KV对,K是元素,V是1  
  

接着看.reduceByKey (_ + _

  
 <代码>这个其实就是将同一关键的KV进行聚合分组,然后将同一键的值进行相加,最后就得出某个关键对应的价值,也就是某个单词的个数
  
  看看这个函数
  def reduceByKey (func: (V, V)=比;V):抽样[(K、V)]=自我。withScope {
  reduceByKey (defaultPartitioner(自我)、函数)
  }
  这个过程中会分区,默认分区数是2,使用的是HashPartitioner进行分区,可以指定分区的最小个数 
  

二,火花的资源调度

  

2.1资源调度流程

  

三,火花,火花调度原理分析

  

?图2.1火花资源调度

  

1,执行提交命令,会在客户端客户端启动一个spark-submit进程(用来为司机申请资源)。
2,为司机向主申请资源,在主人的waitingDrivers集合中添加这个司机要申请的信息部分查看工作集合,挑选出合适的工作节点。
3,在选中的工作节点中启动司机进程(司机进程已经启动了,spark-submit的使命已经完成了,关闭该进程)。所以其实司机也需要资源,也只是跑在执行器上的一个线程而已

三,火花,火花调度原理分析