一 安装指引
安装这块本文就不展开具体的步骤了,毕竟大家的机子环境都不尽相同。不过可以简单说几点重要的步骤,然后节末放上一些安装示例供大家参考。
1)要使用PySpark,机子上要有Java开发环境
2)环境变量记得要配置完整
3)Mac下的/usr/local/ 路径一般是隐藏的,PyCharm配置py4j和pyspark的时候可以使用 shift+command+G 来使用路径访问。
4)Mac下如果修改了 ~/.bash_profile 的话,记得要重启下PyCharm才会生效的哈
5)版本记得要搞对,保险起见Java的jdk版本选择低版本(别问我为什么知道),我选择的是Java8.
下面是一些示例,可以参考下:
1)Mac下安装spark,并配置pycharm-pyspark完整教程
https://blog.csdn.net/shiyutianming/article/details/99946797
2)virtualBox里安装开发环境
https://www.bilibili.com/video/BV1i4411i79a?p=3
3)快速搭建spark开发环境,云哥项目
https://github.com/lyhue1991/eat_pyspark_in_10_days
二 Spark基础概念
关于Spark的基础概念,我在先前的文章里也有写过,大家可以一起来回顾一下 《想学习Spark?先带你了解一些基础的知识》。作为补充,今天在这里也介绍一些在Spark中会经常遇见的专有名词。
Q1: 什么是RDD
RDD的全称是 Resilient Distributed Datasets,这是Spark的一种数据抽象集合,它可以被执行在分布式的集群上进行各种操作,而且有较强的容错机制。RDD可以被分为若干个分区,每一个分区就是一个数据集片段,从而可以支持分布式计算。它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些RDD应用多个操作来完成某项任务。
Q2: RDD运行时相关的关键名词
简单来说可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,这几个东西在调优的时候也会经常遇到的。
Client:指的是客户端进程,主要负责提交job到Master;
Job:Job来自于我们编写的程序,Application包含一个或者多个job,job包含各种RDD操作;
Master:指的是Standalone模式中的主控节点,负责接收来自Client的job,并管理着worker,可以给worker分配任务和资源(主要是driver和executor资源);
Worker:指的是Standalone模式中的slave节点,负责管理本节点的资源,同时受Master管理,需要定期给Master回报heartbeat(心跳),启动Driver和Executor;
Driver:指的是 job(作业)的主进程,一般每个Spark作业都会有一个Driver进程,负责整个作业的运行,包括了job的解析、Stage的生成、调度Task到Executor上去执行;
Stage:中文名 阶段,是job的基本调度单位,因为每个job会分成若干组Task,每组任务就被称为 Stage;
Task:任务,指的是直接运行在executor上的东西,是executor上的一个线程;
Executor:指的是 执行器,顾名思义就是真正执行任务的地方了,一个集群可以被配置若干个Executor,每个Executor接收来自Driver的Task,并执行它(可同时执行多个Task)。
Q3: 什么是DAG
全称是 Directed Acyclic Graph,中文名是有向无环图。Spark就是借用了DAG对RDD之间的关系进行了建模,用来描述RDD之间的因果依赖关系。因为在一个Spark作业调度中,多个作业任务之间也是相互依赖的,有些任务需要在一些任务执行完成了才可以执行的。在Spark调度中就是有DAGscheduler,它负责将job分成若干组Task组成的Stage。
Q4: Spark的部署模式有哪些
主要有local模式、Standalone模式、Mesos模式、YARN模式。
1.Standalone: 独立模式,Spark 原生的简单集群管理器, 自带完整的服务, 可单独部署到一个集群中,无需依赖任何其他资源管理系统, 使用 Standalone 可以很方便地搭建一个集群,一般在公司内部没有搭建其他资源管理框架的时候才会使用。
2.Mesos:一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括 yarn.
3.YARN: 统一的资源管理机制, 在上面可以运行多套计算框架, 如map reduce、storm 等, 根据 driver 在集群中的位置不同,分为 yarn client 和 yarn cluster。
实际上Spark内部为了方便用户测试,自身也提供了一些部署模式。由于在实际工厂环境下使用的绝大多数的集群管理器是 Hadoop YARN,因此我们关注的重点是 Hadoop YARN 模式下的 Spark 集群部署。
用户在提交任务给 Spark 处理时,以下两个参数共同决定了 Spark 的运行方式。
·– master MASTER_URL :决定了 Spark 任务提交给哪种集群处理。
·– deploy-mode DEPLOY_MODE:决定了 Driver 的运行方式,可选值为 Client或者 Cluster。
Q5: Shuffle操作是什么
Shuffle指的是数据从Map端到Reduce端的数据传输过程,Shuffle性能的高低直接会影响程序的性能。因为Reduce task需要跨节点去拉在分布在不同节点上的Map task计算结果,这一个过程是需要有磁盘IO消耗以及数据网络传输的消耗的,所以需要根据实际数据情况进行适当调整。另外,Shuffle可以分为两部分,分别是Map阶段的数据准备与Reduce阶段的数据拷贝处理,在Map端我们叫Shuffle Write,在Reduce端我们叫Shuffle Read。
Q6: 什么是惰性执行
这是RDD的一个特性,在RDD中的算子可以分为Transformation算子和Action算子,其中Transformation算子的操作都不会真正执行,只会记录一下依赖关系,直到遇见了Action算子,在这之前的所有Transform操作才会被触发计算,这就是所谓的惰性执行。具体哪些是Transformation和Action算子,可以看下一节。
三 Spark与Pyspark架构
Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。
对于pyspark,为了不破坏Spark已有的运行时架构,Spark在外围包装一层Python API。在Driver端,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序。在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码。
四 PySpark简介
Apache Spark是用Scala编程语言编写的。为了用Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,您也可以使用Python编程语言处理RDD。正是由于一个名为Py4j的库,他们才能实现这一目标。
3.0 运行pyspark的方式
pyspark主要通过以下一些方式运行。
1 通过pyspark进入pyspark单机交互式环境。
这种方式一般用来测试代码。
也可以指定jupyter或者ipython为交互环境。
2 通过spark-submit提交Spark任务到集群运行。
这种方式可以提交Python脚本或者Jar包到集群上让成百上千个机器运行任务。
这也是工业界生产中通常使用spark的方式。
3 Python安装findspark和pyspark库。
可以在jupyter和其它Python环境中像调用普通库一样地调用pyspark库。
这也是本书配置pyspark练习环境的方式。
通过spark-submit提交任务到集群运行常见问题
以下为在集群上运行pyspark时相关的一些问题,
1,pyspark是否能够调用Scala或者Java开发的jar包?
答:只有Driver中能够调用jar包,通过Py4J进行调用,在executors中无法调用。
2,pyspark如何在excutors中安装诸如pandas,numpy等包?
答:可以通过conda建立Python环境,然后将其压缩成zip文件上传到hdfs中,并在提交任务时指定环境。
当然,最简单直接的方案是把你想要的anaconda环境打包成zip上传到集群hdfs环境中。注意,你打包的机器应当和集群的机器具有相同的linux操作系统。
3,pyspark如何添加自己编写的其它Python脚本到excutors中的PYTHONPATH中?
答:可以用py-files参数设置,可以添加.py,.egg 或者压缩成.zip的Python脚本,在excutors中可以import它们。
4,pyspark如何添加一些配置文件到各个excutors中的工作路径中?
答:可以用files参数设置,不同文件名之间以逗号分隔,在excutors中用SparkFiles.get(fileName)获取。
-
#提交python写的任务
-
spark-submit --master yarn \
-
--deploy-mode cluster \
-
--executor-memory 12G \
-
--driver-memory 12G \
-
--num-executors 100 \
-
--executor-cores 2 \
-
--conf spark.yarn.maxAppAttempts=2 \
-
--conf spark.default.parallelism=1600 \
-
--conf spark.sql.shuffle.partitions=1600 \
-
--conf spark.memory.offHeap.enabled=
true \
-
--conf spark.memory.offHeap.size=2g\
-
--conf spark.task.maxFailures=10 \
-
--conf spark.stage.maxConsecutiveAttempts=10 \
-
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python
#指定excutors的Python环境
-
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python
#cluster模式时候设置
-
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip
#上传到hdfs的Python环境
-
--files data.csv,profile.txt
-
--py-files pkg.py,tqdm.py
-
pyspark_demo.py
PySpark - SparkContext
SparkContext是任何spark功能的入口点。当我们运行任何Spark应用程序时,会启动一个驱动程序,它具有main函数,并且此处启动了SparkContext。然后,驱动程序在工作节点上的执行程序内运行操作。
SparkContext使用Py4J启动JVM并创建JavaSparkContext。默认情况下,PySpark将SparkContext作为'sc'提供,因此创建新的SparkContext将不起作用。
以下代码块包含PySpark类的详细信息以及SparkContext可以采用的参数。
-
class pyspark.SparkContext (
-
master = None,
-
appName = None,
-
sparkHome = None,
-
pyFiles = None,
-
environment = None,
-
batchSize = 0,
-
serializer = PickleSerializer(),
-
conf = None,
-
gateway = None,
-
jsc = None,
-
profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
-
)
以下是SparkContext的参数具体含义:
Master
- 它是连接到的集群的URL。appName
- 您的工作名称。sparkHome
- Spark安装目录。pyFiles
- 要发送到集群并添加到PYTHONPATH的.zip或.py文件。environment
- 工作节点环境变量。batchSize
- 表示为单个Java对象的Python对象的数量。设置1以禁用批处理,设置0以根据对象大小自动选择批处理大小,或设置为-1以使用无限批处理大小。serializer
- RDD序列化器。Conf
- L {SparkConf}的一个对象,用于设置所有Spark属性。gateway
- 使用现有网关和JVM,否则初始化新JVM。JSC
- JavaSparkContext实例。profiler_cls
- 用于进行性能分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler)
在上述参数中,主要使用master和appname。任何PySpark程序的会使用以下两行:
-
from pyspark
import SparkContext
-
sc = SparkContext(
"local",
"Hello Pyspark")
3.1 SparkContext示例 - PySpark Shell
现在你对SparkContext有了足够的了解,让我们在PySpark shell上运行一个简单的例子。在这个例子中,我们将计算README.md文件中带有字符“a”或“b”的行数。那么,让我们说如果一个文件中有5行,3行有字符'a',那么输出将是→ Line with a:3。字符'b'也是如此。
我们不会在以下示例中创建任何SparkContext对象,因为默认情况下,当PySpark shell启动时,Spark会自动创建名为sc的SparkContext对象。如果您尝试创建另一个SparkContext对象,您将收到以下错误 - “ValueError:无法一次运行多个SparkContexts”。
在终端输入pyspark 启动PySpark Shell:
-
>>> logFile=
"file:user/local/hadoop-2.8.5/README.txt"
-
>>> logData=sc.textFile(logFile).cache()
-
>>> numAs=logData.filter(lambda s:
'a'
in s).count()
-
>>> numBs=logData.filter(lambda s:
'b'
in s).count()
-
>>>
print(
"Line with a:%i,line with b:%i" % (numAs,numBs))
-
Line with a:20, line with b:10
3.2 SparkContext示例 - Python程序
让我们使用Python程序运行相同的示例。创建一个名为demo.py的Python文件,并在该文件中输入以下代码。
-
from pyspark
import SparkContext
-
logFile =
"file:usr/local/hadoop-2.8.5/README.txt"
-
sc = SparkContext(
"local",
"Hello PySpark")
-
logData = sc.textFile(logFile).cache()
-
numAs = logData.filter(
lambda s:
'a'
in s).count()
-
numBs = logData.filter(
lambda s:
'b'
in s).count()
-
print(
"Line with a:%i,lines with b :%i" % (numAs, numBs))
然后我们将在终端中执行以下命令来运行此Python文件。我们将得到与上面相同的输出。
spark-submit demo.py
学习资源推荐:
1)edureka about PySpark Tutorial
印度老哥的课程,B站可直接看,不过口音略难听懂不过还好有字幕。
https://www.bilibili.com/video/BV1i4411i79a?p=1
2)eat_pyspark_in_10_days
梁云大哥的课程,讲得超级清晰,建议精读。
https://github.com/lyhue1991/eat_pyspark_in_10_days
3)官方文档
http://spark.apache.org/docs/latest/api/python/
https://www.tutorialspoint.com/pyspark/index.htm
转载:https://blog.csdn.net/zwqjoy/article/details/115768413