PySpark进阶——深入剖析wordcount.py

  

在本文中,我们借由深入剖析wordcount。py来揭开火花内部各种概念的面纱。我们再次回顾wordcount。py代码来回答如下问题

<李>

对于大多数语言的你好词示例,都有主要()函数,wordcount.py的主要函数,或者说调用火花的主要()在哪里

<李>

数据的读入,各个抽样数据如何转换

<李>

地图与flatMap的工作机制,以及区别

<李>

reduceByKey的作用

WordCount。py的代码如下:

得到__future__  import  print_functionimport  sysfrom  operator  import 添加#,SparkSession:是一个对火花的编程入口,取代了原本的SQLContext与HiveContext,方便调用数据集和DataFrame  API #, SparkSession可用于创建DataFrame,将DataFrame注册为表,在表上执行SQL,缓存表和读取镶花文件.from  pyspark.sql  import  SparkSessionif  __name__ ==,“__main__":,,,, #, Python 常用的简单参数传入   ,,,if  len (sys.argv), !=, 2:   ,,,,,,,印刷(“用法:,wordcount  & lt; file>“,,文件=sys.stderr)   ,,,,,,,退出(1),,,,,,,,   ,,,#,appName 为,Spark 应用设定一个应用名,改名会显示在,Spark  Web  UI 上   ,,,#,假如SparkSession 已经存在就取得已存在的SparkSession,否则创建一个新的。   ,,,spark =SparkSession \   ,,,,,,.builder \   ,,,,,,,.appName (“PythonWordCount") \   ,,,,,,,.getOrCreate (),,,,,,,,   ,,,#,读取传入的文件内容,并写入一个新的抽样实例线中,此条语句所做工作有些多,不适合初学者,可以截成两条语句以便理解。   ,,,#,地图是一种转换函数,将原来抽样的每个数据项通过地图中的用户自定义函数f映射转变为一个新的元素。原始抽样中的数据项与新抽样中的数据项是一一对应的关系。   ,,,lines =, spark.read.text (sys.argv [1]) .rdd.map (lambda  r, r [0]),,,   ,,,#,flatMap与地图类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化”后输出,   ,,,counts =, lines.flatMap (lambda  x:, x.split (& # 39;, & # 39;)), \   ,,,,,,,,,,,,,,,,,. map (lambda  x:, (x,, 1)), \   ,,,,,,,,,,,,,,,,,.reduceByKey(添加),,,,,,,,,,,,,,,,   ,,,#,收集(),在驱动程序中将数据集的所有元素作为数组返回只这在返回足够小的数据子集的过滤器或其他操作之后通常是有用的。由于collect 是将整个抽样汇聚到一台机子上,所以通常需要预估返回数据集的大小以免溢出只,,,,,,,,,,,,   ,,,output =, counts.collect (),,,,   ,,,for (单词,,数),拷贝输出:   ,,,,,,,印刷(“% s: %我,%,(单词,,))      ,,,spark.stop ()
火花入口SparkSession

Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用火花的各项功能,这边不妨对照Http会话,在此火花就在充当Web服务的角色,程序调用火花功能的时候需要先建立一个会话。因此看到getOrCreate()就很容易理解了,表明可以视情况新会话或建利用已有的会话。

,,,, spark =SparkSession \   ,,,,,,.builder \   ,,,,,,,.appName (“PythonWordCount") \   ,,,,,,,.getOrCreate ()

既然将火花想象成一个Web服务器,也就意味着可能用多个访问在进行,为了便于监控管理,对应用命名一个恰当的名称是个好办了对法Web这类UI并不是本文的重点,有兴趣的同学可以参考?火花应用程序的Web控制台

加载数据

在建立SparkSession之后,就是读入数据并写入到Dateset中。

,,,, lines =, spark.read.text (sys.argv [1]) .rdd.map (lambda  r, r [0])

为了更好的分解执行过程,是时候借助PySpark了,PySpark是python调用火花的API,它可以启动一个交互式python Shell。为了方便脚本调试,暂时切换到Linux执行

#, pysparkPython  2.7.6 (默认情况下,,Jun  22岁,2015年,17:58:13),   [GCC  4.8.2],提醒linux2   想Type “帮助”,,“copyright",,“credits",趁机“license" for  more 信息。   Using 火花# 39;s  default  log4j 简介:;org/apache/spark/log4j-defaults.properties   Setting  default  log  level 用“WARN"。   用adjust  logging  level  use  sc.setLogLevel(中的)只For  SparkR,, use  setLogLevel(中的)。   17/02/23  08:30:26  WARN  NativeCodeLoader:, Unable 用load  native-hadoop  library  for  your 平台……,using  builtin-java  classes  where 适用   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

PySpark进阶——深入剖析wordcount.py