飞道的博客

PySpark入门学习教程---介绍(1)

230人阅读  评论(0)

一 安装指引

安装这块本文就不展开具体的步骤了,毕竟大家的机子环境都不尽相同。不过可以简单说几点重要的步骤,然后节末放上一些安装示例供大家参考。

1)要使用PySpark,机子上要有Java开发环境

2)环境变量记得要配置完整

3)Mac下的/usr/local/ 路径一般是隐藏的,PyCharm配置py4j和pyspark的时候可以使用 shift+command+G 来使用路径访问。

4)Mac下如果修改了 ~/.bash_profile 的话,记得要重启下PyCharm才会生效的哈

5)版本记得要搞对,保险起见Java的jdk版本选择低版本(别问我为什么知道),我选择的是Java8.

 

下面是一些示例,可以参考下:

1)Mac下安装spark,并配置pycharm-pyspark完整教程

https://blog.csdn.net/shiyutianming/article/details/99946797

2)virtualBox里安装开发环境

https://www.bilibili.com/video/BV1i4411i79a?p=3

3)快速搭建spark开发环境,云哥项目

https://github.com/lyhue1991/eat_pyspark_in_10_days

 

二 Spark基础概念

关于Spark的基础概念,我在先前的文章里也有写过,大家可以一起来回顾一下 《想学习Spark?先带你了解一些基础的知识》。作为补充,今天在这里也介绍一些在Spark中会经常遇见的专有名词。

Q1: 什么是RDD

RDD的全称是 Resilient Distributed Datasets,这是Spark的一种数据抽象集合,它可以被执行在分布式的集群上进行各种操作,而且有较强的容错机制。RDD可以被分为若干个分区,每一个分区就是一个数据集片段,从而可以支持分布式计算。它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些RDD应用多个操作来完成某项任务。

 

Q2: RDD运行时相关的关键名词

简单来说可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,这几个东西在调优的时候也会经常遇到的。

Client:指的是客户端进程,主要负责提交job到Master;

Job:Job来自于我们编写的程序,Application包含一个或者多个job,job包含各种RDD操作;

Master:指的是Standalone模式中的主控节点,负责接收来自Client的job,并管理着worker,可以给worker分配任务和资源(主要是driver和executor资源);

Worker:指的是Standalone模式中的slave节点,负责管理本节点的资源,同时受Master管理,需要定期给Master回报heartbeat(心跳),启动Driver和Executor;

Driver:指的是 job(作业)的主进程,一般每个Spark作业都会有一个Driver进程,负责整个作业的运行,包括了job的解析、Stage的生成、调度Task到Executor上去执行;

Stage:中文名 阶段,是job的基本调度单位,因为每个job会分成若干组Task,每组任务就被称为 Stage;

Task:任务,指的是直接运行在executor上的东西,是executor上的一个线程;

Executor:指的是 执行器,顾名思义就是真正执行任务的地方了,一个集群可以被配置若干个Executor,每个Executor接收来自Driver的Task,并执行它(可同时执行多个Task)。

Q3: 什么是DAG

全称是 Directed Acyclic Graph,中文名是有向无环图。Spark就是借用了DAG对RDD之间的关系进行了建模,用来描述RDD之间的因果依赖关系。因为在一个Spark作业调度中,多个作业任务之间也是相互依赖的,有些任务需要在一些任务执行完成了才可以执行的。在Spark调度中就是有DAGscheduler,它负责将job分成若干组Task组成的Stage。

Q4: Spark的部署模式有哪些

主要有local模式、Standalone模式、Mesos模式、YARN模式。
1.Standalone: 独立模式,Spark 原生的简单集群管理器, 自带完整的服务, 可单独部署到一个集群中,无需依赖任何其他资源管理系统, 使用 Standalone 可以很方便地搭建一个集群,一般在公司内部没有搭建其他资源管理框架的时候才会使用。
2.Mesos:一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括 yarn.
3.YARN: 统一的资源管理机制, 在上面可以运行多套计算框架, 如map reduce、storm 等, 根据 driver 在集群中的位置不同,分为 yarn client 和 yarn cluster。
实际上Spark内部为了方便用户测试,自身也提供了一些部署模式。由于在实际工厂环境下使用的绝大多数的集群管理器是 Hadoop YARN,因此我们关注的重点是 Hadoop YARN 模式下的 Spark 集群部署。

用户在提交任务给 Spark 处理时,以下两个参数共同决定了 Spark 的运行方式。
·– master MASTER_URL :决定了 Spark 任务提交给哪种集群处理。
·– deploy-mode DEPLOY_MODE:决定了 Driver 的运行方式,可选值为 Client或者 Cluster。

Q5: Shuffle操作是什么

Shuffle指的是数据从Map端到Reduce端的数据传输过程,Shuffle性能的高低直接会影响程序的性能。因为Reduce task需要跨节点去拉在分布在不同节点上的Map task计算结果,这一个过程是需要有磁盘IO消耗以及数据网络传输的消耗的,所以需要根据实际数据情况进行适当调整。另外,Shuffle可以分为两部分,分别是Map阶段的数据准备与Reduce阶段的数据拷贝处理,在Map端我们叫Shuffle Write,在Reduce端我们叫Shuffle Read。

Q6: 什么是惰性执行

这是RDD的一个特性,在RDD中的算子可以分为Transformation算子和Action算子,其中Transformation算子的操作都不会真正执行,只会记录一下依赖关系,直到遇见了Action算子,在这之前的所有Transform操作才会被触发计算,这就是所谓的惰性执行。具体哪些是Transformation和Action算子,可以看下一节。

三 Spark与Pyspark架构

Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。

对于pyspark,为了不破坏Spark已有的运行时架构,Spark在外围包装一层Python API。在Driver端,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序。在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码。

 

四 PySpark简介

Apache Spark是用Scala编程语言编写的。为了用Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,您也可以使用Python编程语言处理RDD。正是由于一个名为Py4j的库,他们才能实现这一目标。

3.0 运行pyspark的方式

pyspark主要通过以下一些方式运行。
1 通过pyspark进入pyspark单机交互式环境。
这种方式一般用来测试代码。
也可以指定jupyter或者ipython为交互环境。

2 通过spark-submit提交Spark任务到集群运行。
这种方式可以提交Python脚本或者Jar包到集群上让成百上千个机器运行任务。
这也是工业界生产中通常使用spark的方式。

3 Python安装findspark和pyspark库。
可以在jupyter和其它Python环境中像调用普通库一样地调用pyspark库。
这也是本书配置pyspark练习环境的方式。

通过spark-submit提交任务到集群运行常见问题

以下为在集群上运行pyspark时相关的一些问题,
1,pyspark是否能够调用Scala或者Java开发的jar包?
答:只有Driver中能够调用jar包,通过Py4J进行调用,在executors中无法调用。
2,pyspark如何在excutors中安装诸如pandas,numpy等包?
答:可以通过conda建立Python环境,然后将其压缩成zip文件上传到hdfs中,并在提交任务时指定环境。
当然,最简单直接的方案是把你想要的anaconda环境打包成zip上传到集群hdfs环境中。注意,你打包的机器应当和集群的机器具有相同的linux操作系统。
3,pyspark如何添加自己编写的其它Python脚本到excutors中的PYTHONPATH中?
答:可以用py-files参数设置,可以添加.py,.egg 或者压缩成.zip的Python脚本,在excutors中可以import它们。
4,pyspark如何添加一些配置文件到各个excutors中的工作路径中?
答:可以用files参数设置,不同文件名之间以逗号分隔,在excutors中用SparkFiles.get(fileName)获取。


  
  1. #提交python写的任务
  2. spark-submit --master yarn \
  3. --deploy-mode cluster \
  4. --executor-memory 12G \
  5. --driver-memory 12G \
  6. --num-executors 100 \
  7. --executor-cores 2 \
  8. --conf spark.yarn.maxAppAttempts=2 \
  9. --conf spark.default.parallelism=1600 \
  10. --conf spark.sql.shuffle.partitions=1600 \
  11. --conf spark.memory.offHeap.enabled= true \
  12. --conf spark.memory.offHeap.size=2g\
  13. --conf spark.task.maxFailures=10 \
  14. --conf spark.stage.maxConsecutiveAttempts=10 \
  15. --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
  16. --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置
  17. --archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
  18. --files data.csv,profile.txt
  19. --py-files pkg.py,tqdm.py
  20. pyspark_demo.py

 

PySpark - SparkContext

SparkContext是任何spark功能的入口点。当我们运行任何Spark应用程序时,会启动一个驱动程序,它具有main函数,并且此处启动了SparkContext。然后,驱动程序在工作节点上的执行程序内运行操作。

SparkContext使用Py4J启动JVM并创建JavaSparkContext。默认情况下,PySpark将SparkContext作为'sc'提供,因此创建新的SparkContext将不起作用。

以下代码块包含PySpark类的详细信息以及SparkContext可以采用的参数。


  
  1. class pyspark.SparkContext (
  2. master = None,
  3. appName = None,
  4. sparkHome = None,
  5. pyFiles = None,
  6. environment = None,
  7. batchSize = 0,
  8. serializer = PickleSerializer(),
  9. conf = None,
  10. gateway = None,
  11. jsc = None,
  12. profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
  13. )

以下是SparkContext的参数具体含义:

  • Master- 它是连接到的集群的URL。
  • appName- 您的工作名称。
  • sparkHome - Spark安装目录。
  • pyFiles - 要发送到集群并添加到PYTHONPATH的.zip或.py文件。
  • environment - 工作节点环境变量。
  • batchSize - 表示为单个Java对象的Python对象的数量。设置1以禁用批处理,设置0以根据对象大小自动选择批处理大小,或设置为-1以使用无限批处理大小。
  • serializer- RDD序列化器。
  • Conf - L {SparkConf}的一个对象,用于设置所有Spark属性。
  • gateway - 使用现有网关和JVM,否则初始化新JVM。
  • JSC - JavaSparkContext实例。
  • profiler_cls - 用于进行性能分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler)

在上述参数中,主要使用master和appname。任何PySpark程序的会使用以下两行:


  
  1. from pyspark import SparkContext
  2. sc = SparkContext( "local", "Hello Pyspark")

3.1 SparkContext示例 - PySpark Shell

现在你对SparkContext有了足够的了解,让我们在PySpark shell上运行一个简单的例子。在这个例子中,我们将计算README.md文件中带有字符“a”或“b”的行数。那么,让我们说如果一个文件中有5行,3行有字符'a',那么输出将是→ Line with a:3。字符'b'也是如此。

我们不会在以下示例中创建任何SparkContext对象,因为默认情况下,当PySpark shell启动时,Spark会自动创建名为sc的SparkContext对象。如果您尝试创建另一个SparkContext对象,您将收到以下错误 - “ValueError:无法一次运行多个SparkContexts”。

在终端输入pyspark 启动PySpark Shell:


  
  1. >>> logFile= "file:user/local/hadoop-2.8.5/README.txt"
  2. >>> logData=sc.textFile(logFile).cache()
  3. >>> numAs=logData.filter(lambda s: 'a' in s).count()
  4. >>> numBs=logData.filter(lambda s: 'b' in s).count()
  5. >>> print( "Line with a:%i,line with b:%i" % (numAs,numBs))
  6. Line with a:20, line with b:10

3.2 SparkContext示例 - Python程序

让我们使用Python程序运行相同的示例。创建一个名为demo.py的Python文件,并在该文件中输入以下代码。


  
  1. from pyspark import SparkContext
  2. logFile = "file:usr/local/hadoop-2.8.5/README.txt"
  3. sc = SparkContext( "local", "Hello PySpark")
  4. logData = sc.textFile(logFile).cache()
  5. numAs = logData.filter( lambda s: 'a' in s).count()
  6. numBs = logData.filter( lambda s: 'b' in s).count()
  7. print( "Line with a:%i,lines with b :%i" % (numAs, numBs))

然后我们将在终端中执行以下命令来运行此Python文件。我们将得到与上面相同的输出。

spark-submit demo.py 

 

学习资源推荐:

1)edureka about PySpark Tutorial

印度老哥的课程,B站可直接看,不过口音略难听懂不过还好有字幕。

https://www.bilibili.com/video/BV1i4411i79a?p=1

2)eat_pyspark_in_10_days

梁云大哥的课程,讲得超级清晰,建议精读。

https://github.com/lyhue1991/eat_pyspark_in_10_days

3)官方文档

http://spark.apache.org/docs/latest/api/python/

https://www.tutorialspoint.com/pyspark/index.htm

 

 

Spark的三种集群部署模式

大数据入门与实战-PySpark的使用教程

 

 

 


转载:https://blog.csdn.net/zwqjoy/article/details/115768413
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场