(TOC)
一、wordcount程序的执行过程
<代码类=" language-scala ">进口org.apache.spark.rdd.RDD 进口org.apache.spark。{SparkConf, SparkContext} 对象WordCount { def主要(args:数组[String]):单位={//创建火花配置文件对象。设置应用名称,主人地址,当地表示为本地模式。//如果是提交到集群中,通常不指定。因为可能在多个集群汇上跑,写死不方便 配置=new SparkConf瓦尔().setAppName (“wordCount”)//创建火花上下文对象 val sc=new SparkContext(配置) sc.textFile .flatMap (_ (args (0))。分割(" ")) . map ((_, - 1)) .reduceByKey (_ + _) .saveAsTextFile (args (1)) sc.stop () } }代码>
核心代码很简单,首先看文本文件这个函数
<代码> SparkContext.scala def文本文件( 路径:字符串, minPartitions: Int=defaultMinPartitions):抽样[String]=withScope { assertNotStopped ()//指定文件路径,输入的格式类为textinputformat,输出的关键类型为longwritable,输出的价值类型为文本//地图(一对=比;pair._2.toString)取出前面的价值,然后将转值为字符串类型//最后将处理后的价值返回成一个新的列表,也就是抽样(字符串)//setName(路径)设置该文件的名字为路径 名为[TextInputFormat], classOf hadoopFile(路径,名为[LongWritable], classOf名为[文本],classOf minPartitions)。地图(一对=比;pair._2.toString) .setName(路径) } 关键性的操作就是: 返回了一个hadoopFile,它有几个参数: 路径:文件路径 名为[TextInputFormat]: classOf这个其实就是输入文件的处理类,也就是我先生们中分析过的TextInputFormat,其实就是直接拿过来的用的,不要怀疑,就是酱紫的 名为[LongWritable], classOf名为[文本]:classOf这两个其实可以猜到了,就是输入的键和值的类型。 接着执行了一个映射(一对=比;pair._2.toString),将KV中转的值为字符串类型代码>
我们接着看看hadoopFile这个方法
<代码> def hadoopFile [K、V] ( 路径:字符串, inputFormatClass:类[_ & lt;: InputFormat [K、V]], keyClass:类[K], valueClass:类[V], minPartitions: Int=defaultMinPartitions):抽样((K、V))=withScope { assertNotStopped ()//这是一个黑客执行加载hdfs-site.xml。//看到火花- 11227的细节。 FileSystem.getLocal (hadoopConfiguration)//Hadoop可以大约10 KB,配置相当大,所以广播。 val confBroadcast=广播(新SerializableConfiguration (hadoopConfiguration)) val setInputPathsFunc=(jobConf: jobConf)=比;FileInputFormat。setInputPaths (jobConf路径)//看到这里,最后返回的是一个HadoopRDD对象//指定sc对象,配置文件,输入方法类,KV类型,分区个数 新HadoopRDD ( 这一点, confBroadcast, 一些(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions) .setName(路径) }代码>
最后返回HadoopRDD对象。
接着就是flatMap ( .split (““)). map (( 1)),比较简单
<代码> flatMap (_。分割(" ")) 就是将输入每一行,按照空格切割,然后切割后的元素称为一个新的数组。 然后将每一行生成的数组合并成一个大数组。 地图((_,- 1)) 将每个元素进行1的计数,组成KV对,K是元素,V是1 代码>
接着看.reduceByKey (_ + _
<代码>这个其实就是将同一关键的KV进行聚合分组,然后将同一键的值进行相加,最后就得出某个关键对应的价值,也就是某个单词的个数 看看这个函数 def reduceByKey (func: (V, V)=比;V):抽样[(K、V)]=自我。withScope { reduceByKey (defaultPartitioner(自我)、函数) } 这个过程中会分区,默认分区数是2,使用的是HashPartitioner进行分区,可以指定分区的最小个数代码>
二,火花的资源调度
2.1资源调度流程
?图2.1火花资源调度
1,执行提交命令,会在客户端客户端启动一个spark-submit进程(用来为司机申请资源)。
2,为司机向主申请资源,在主人的waitingDrivers集合中添加这个司机要申请的信息部分查看工作集合,挑选出合适的工作节点。
3,在选中的工作节点中启动司机进程(司机进程已经启动了,spark-submit的使命已经完成了,关闭该进程)。所以其实司机也需要资源,也只是跑在执行器上的一个线程而已