Spark 框架主要是由 Scala 语言实现,同时也包含少量 Java 代码。Spark 面向用户的编程接口,也是 Scala。然而,在数据科学领域,Python 一直占据比较重要的地位,仍然有大量的数据工程师在使用各类 Python 数据处理和科学计算的库,例如 numpy、Pandas、scikit-learn 等。同时,Python 语言的入门门槛也显著低于 Scala。为此,Spark 推出了 PySpark,在 Spark 框架上提供一套 Python 的接口,方便广大数据科学家使用。本文主要从源码实现层面解析 PySpark 的实现原理,包括以下几个方面:
1、PySpark 的多进程架构;
2、Python 端调用 Java、Scala 接口;
3、Python Driver 端 RDD、SQL 接口;
4、Executor 端进程间通信和序列化;
5、Pandas UDF;
6、总结;
01. PySpark 的多进程架构
PySpark 采用了 Python、JVM 进程分离的多进程架构,在 Driver、Executor 端均会同时有 Python、JVM 两个进程。当通过 spark-submit 提交一个 PySpark 的 Python 脚本时,Driver 端会直接运行这个 Python 脚本,并从 Python 中启动 JVM;而在 Python 中调用的 RDD 或者 DataFrame 的操作,会通过 Py4j 调用到 Java 的接口。在 Executor 端恰好是反过来,首先由 Driver 启动了 JVM 的 Executor 进程,然后在 JVM 中去启动 Python 的子进程,用以执行 Python 的 UDF,这其中是使用了 socket 来做进程间通信。总体的架构图如下所示:
02. Python Driver 如何调用 Java 的接口
上面提到,通过 spark-submit 提交 PySpark 作业后,Driver 端首先是运行用户提交的 Python 脚本,然而 Spark 提供的大多数 API 都是 Scala 或者 Java 的,那么就需要能够在 Python 中去调用 Java 接口。这里 PySpark 使用了 Py4j 这个开源库。当创建 Python 端的 SparkContext 对象时,实际会启动 JVM,并创建一个 Scala 端的 SparkContext 对象。代码实现在 python/pyspark/context.py:
github代码:
https://github.com/apache/spark/blob/master/python/pyspark/context.py
文档代码
http://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext
@classmethod
def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
"""
Checks whether a SparkContext is initialized or not.
Throws error if a SparkContext is already running.
"""
with SparkContext._lock:
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway(conf)
SparkContext._jvm = SparkContext._gateway.jvm
if instance:
if (SparkContext._active_spark_context and
SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite
# Raise error if there is already a running Spark context
raise ValueError(
"Cannot run multiple SparkContexts at once; "
"existing SparkContext(app=%s, master=%s)"
" created by %s at %s:%s "
% (currentAppName, currentMaster,
callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance
在 launch_gateway (python/pyspark/java_gateway.py)中,首先启动JVM 进程:
def launch_gateway(conf=None, popen_kwargs=None):
"""
launch jvm gateway
Parameters
----------
conf : :py:class:`pyspark.SparkConf`
spark configuration passed to spark-submit
popen_kwargs : dict
Dictionary of kwargs to pass to Popen when spawning
the py4j JVM. This is a developer feature intended for use in
customizing how pyspark interacts with the py4j JVM (e.g., capturing
stdout/stderr).
Returns
-------
ClientServer or JavaGateway
"""
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
# Process already exists
proc = None
else:
SPARK_HOME = _find_spark_home()
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
command = [os.path.join(SPARK_HOME, script)]
if conf:
for k, v in conf.getAll():
command += ['--conf', '%s=%s' % (k, v)]
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
submit_args
])
command = command + shlex.split(submit_args)
# Create a temporary directory where the gateway server should write the connection
# information.
conn_info_dir = tempfile.mkdtemp()
try:
fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
os.close(fd)
os.unlink(conn_info_file)
env = dict(os.environ)
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
# Launch the Java gateway.
popen_kwargs = {
} if popen_kwargs is None else popen_kwargs
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
popen_kwargs['stdin'] = PIPE
# We always set the necessary environment variables.
popen_kwargs['env'] = env
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
popen_kwargs['preexec_fn'] = preexec_func
proc = Popen(command, **popen_kwargs)
else:
# preexec_fn not supported on Windows
proc = Popen(command, **popen_kwargs)
# Wait for the file to appear, or for the process to exit, whichever happens first.
while not proc.poll() and not os.path.isfile(conn_info_file):
time.sleep(0.1)
if not os.path.isfile(conn_info_file):
raise Exception("Java gateway process exited before sending its port number")
with open(conn_info_file, "rb") as info:
gateway_port = read_int(info)
gateway_secret = UTF8Deserializer().loads(info)
finally:
shutil.rmtree(conn_info_dir)
# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)
# Connect to the gateway (or client server to pin the thread between JVM and Python)
if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
gateway = ClientServer(
java_parameters=JavaParameters(
port=gateway_port,
auth_token=gateway_secret,
auto_convert=True),
python_parameters=PythonParameters(
port=0,
eager_load=False))
else:
gateway = JavaGateway(
gateway_parameters=GatewayParameters(
port=gateway_port,
auth_token=gateway_secret,
auto_convert=True))
# Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr)
gateway.proc = proc
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.resource.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway
pyspark 优势
与传统的 Python 相比,PySpark 的优势体现在∶
- PySpark 的底层架构依旧是基于HDFS 文件系统,这就意味着 PySpark 可以直接读取
HDFS 中的数据文件,而传统的 Python读取 HDFS 中的文件需要借助 hdfs3 这个第三方库。同时 PySpark 可以直接读取 HBase 和 Hive 中的数据表; - PySpark 中的 DataFrame 较 Pandas中的 DataFrame可以容纳更多的数据集,实现了
真正意义上的大数据处理,并且提供了schema,这是 Pandas 所不具有的;口 PySpark 支持 Hadoop,实现了分布式处理数据量大的文件,效率更加高效; - PySpark将机器学习的库已经进行封装,并且提供了流水线(Pipeline),大大方便了开发者的使用;
- PySpark采用分布式并行计算框架,内建并行机制 parallelism,所有的数据和操作自
动并行分布在各个集群结点上; - PySpark采用内存机制处理,可以将RDD 或者 DataFrame保存在内存中,减少I/O
读取,使得数据处理更加高效; 一般的 Python 只支持单机缓存。
PySpark提供了2种机器学习的包,分别是 MLlib 和 ML。MLlib 是基于RDD的,从 Spark 2.0开始,基于RDD的 API进入维护模式(即不再增加任何新的特性),并预期于3.0版本的时候从 Spark中移除。ML提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(Pipeline)。ML Pipeline 弥补了原始的 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流 API套件。
参考文献
https://www.mobvista.com/cn/blog/2019-12-27-2/
pyspark 启动原理:
https://blog.csdn.net/wangyaninglm/article/details/114038572
转载:https://blog.csdn.net/wangyaninglm/article/details/117077547