第1章 简介
1.1 概要介绍
Flink on Yarn的HA高可用模式,首先依赖于Yarn自身的高可用机制(ResourceManager高可用),并通过Yarn对JobManager进行管理,当JobManager失效时,Yarn将重新启动JobManager。其次Flink Job在恢复时,需要依赖Checkpoint进行恢复,而Checkpoint的快照依赖于远端的存储:HDFS,所以HDFS也必须是高可用,同时JobManager的元数据信息也依赖于HDFS的高可用(namenode的高可用,和多副本机制),再者JobManager元数据的指针信息要依赖于Zookeeper的高可用。本章重点介绍Flink本身的高可用,其他框架的高可用请参考笔者之前的文章。
1.2 Flink on Yarn的优势
相对于 Standalone 模式,在Yarn 模式下有以下几点好处:
1.资源按需使用,提高集群的资源利用率;
2.任务有优先级,根据优先级运行作业;
3.基于 Yarn 调度系统,能够自动化地处理各个角色的 Failover:
JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控;
如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器;
如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager。
第2章 Flink on Yarn模式运行的方式
2.1 Per-Job
Per-Job模式:简答的说就是直接run job,每次提交的任务Yarn都会分配一个JobManager,执行完之后整个资源会释放,包括JobManager和TaskManager。
Per-Job模式适合比较大的任务、执行时间比较长的任务。
2.2 Session
Session模式:在Session模式中, Dispatcher 和 ResourceManager 是可以复用的;当执行完Job之后JobManager并不会释放,Session 模式也称为多线程模式,其特点是资源会一直存在不会释放。使用时先启动yarn-session,然后再提交job,每次提交job,也都会分配一个JobManager。
Session模式适合比较小的任务、执行时间比较短的任务。该模式不用频繁的申请资源和释放资源。
注:本章先简单的了解这两种模式和使用上的区别,笔者会在后续的章节中对其原理进行较为详细的剖析。敬请期待!
第3章 集群规划
笔者是在原有的3台机器名为Hadoop10*的机器上安装的Flink,所以这里机器名都是Hadoop100、Hadoop101、Hadoop102。
|
Hadoop100(Flink) |
Hadoop101(Flink) |
Hadoop102(Flink) |
JobManager |
√ |
√ |
√ |
TaskManager |
√ |
√ |
√ |
第4章 下载安装
Flink官网:https://flink.apache.org/
大家到官网进行下载,先参考我之前的文章进行环境变量的配置:大数据实操篇 No.9-Flink Standalone模式部署及使用
Flink on Yarn部署使用前一定要先安装好Zookeeper和Hadoop(HDFS和Yarn)。
第5章 部署和使用
5.1 修改Hadoop配置
5.1.1 修改yarn-site.xml
修改hadoop配置文件/etc/hadoop/yarn-site.xml,设置application master重启时,尝试的最大次数。
-
<property>
-
<name>yarn.resourcemanager.am.max-attempts
</name>
-
<value>10
</value>
-
<description>
-
The maximum number of application master execution attempts.
-
</description>
-
</property>
5.2 修改masters
修改conf目录下masters文件:
-
hadoop100:
8081
-
hadoop101:
8081
-
hadoop102:
8081
5.3 修改workers
修改conf目录下workers文件:
-
hadoop100
-
hadoop101
-
hadoop102
接下来,笔者用2种模式单独进行部署演示。
5.4 Per-Job模式
5.4.1 修改flink-conf.yaml
修改flink配置文件/conf/flink-conf.yaml
-
# jobmanager和taskmanager、其他client的RPC通信IP地址,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
-
jobmanager.rpc.address: localhost
-
-
# jobmanager和taskmanager、其他client的RPC通信端口,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
-
jobmanager.rpc.port: 6123
-
-
# jobmanager JVM heap 内存大小
-
jobmanager.memory.process.size: 1024m
-
-
# taskmanager JVM heap 内存大小
-
taskmanager.memory.process.size: 1024m
-
-
# 每个taskmanager提供的任务slots数量
-
# 并行度等于TM 数量乘以每个TM 的Solts 数量 TM=并行度/Solts数量 如果slots数量大于8 则只会起一个TM
-
taskmanager.numberOfTaskSlots: 3
-
-
# 并行计算个数
-
parallelism.default: 6
-
-
# 高可用模式
-
high-availability: zookeeper
-
-
# JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
-
high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/
-
-
# Zookeeper集群
-
high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181
-
-
# 在zookeeper下的根目录
-
high-availability.zookeeper.path.root: /flink_yarn
-
-
# zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID,官方建议这个配置最好去掉,因为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。
-
#high-availability.cluster-id: /default_yarn
-
-
# 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
-
yarn.application-attempts: 10
-
-
#==============================================================================
-
# Fault tolerance and checkpointing
-
#==============================================================================
-
# jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
-
state.backend: rocksdb
-
# 检查点的默认目录。Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
-
state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints
-
# 保存点的默认目录。由状态后端用于将保存点写入文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
-
state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints
-
# 是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
-
state.backend.incremental:
false
-
# jobmanager故障恢复策略,指定作业计算如何从任务失败中恢复 full重新启动所有任务以恢复作业 region重新启动可能受任务失败影响的所有任务,region在目前(flink1.11)只对批处理有效,实时计算任然时full
-
jobmanager.execution.failover-strategy: region
-
# 全局检查点的保留数量
-
state.checkpoints.num-retained: 3
-
# 本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖keyed state backends。当前,MemoryStateBackend不支持本地恢复。
-
state.backend.local-recovery:
true
-
# 存储基于文件的状态以进行本地恢复的根目录。本地恢复当前仅涵盖keyed state backends
-
taskmanager.state.local.root-dirs: /opt/flink-tm-state
修改完所有配置后,注意将配置分发到其他机器上。
注意:Hadoop中的yarn.resourcemanager.am.max-attemps和Flink中的yarn.application-attempts配置含义:
flink job失败重启次数,尝试重新启动9次(9次重试+ 1次初始尝试)
flink job(在yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,flink中的yarn.application-attempts配置不能超过yarn中的yarn.resourcemanager.am.max-attemps。
5.4.2 运行作业
Flink各版本的运行命令可能存在差异,可以先通过-h查看帮助。
$ bin/flink run -h
运行Job
$ bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-job-example
5.4.3 查看运行情况
资源已经受Yarn的管理,可以直接从Yarn集群的Web UI界面打开ApplicationMaster,从而进入到Flink的Web UI控制台:
运行中的作业:
运行完成的作业:
运行完成后,JobManager和TaskManager就被释放了,此时Yarn中显示的运行结果如下:
Linux系统中显示的运行信息:
5.5 Session模式
5.5.1 修改flink-conf.yaml
修改flink配置文件/conf/flink-conf.yaml
-
# jobmanager和taskmanager、其他client的RPC通信IP地址,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
-
jobmanager.rpc.address: localhost
-
-
# jobmanager和taskmanager、其他client的RPC通信端口,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
-
jobmanager.rpc.port: 6123
-
-
# jobmanager JVM heap 内存大小
-
jobmanager.memory.process.size: 1024m
-
-
# taskmanager JVM heap 内存大小
-
taskmanager.memory.process.size: 1024m
-
-
# 每个taskmanager提供的任务slots数量
-
taskmanager.numberOfTaskSlots: 3
-
-
# 并行计算个数
-
parallelism.default: 3
-
-
# 高可用模式
-
high-availability: zookeeper
-
-
# JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
-
high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/
-
-
# Zookeeper集群
-
high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181
-
-
# 在zookeeper下的根目录
-
high-availability.zookeeper.path.root: /flink_yarn
-
-
# zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID,官方建议这个配置最好去掉,因为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。
-
#high-availability.cluster-id: /default_yarn
-
-
# 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
-
yarn.application-attempts: 10
-
-
#==============================================================================
-
# Fault tolerance and checkpointing
-
#==============================================================================
-
# jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
-
state.backend: rocksdb
-
# 检查点的默认目录。Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
-
state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints
-
# 保存点的默认目录。由状态后端用于将保存点写入文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
-
state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints
-
# 是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
-
state.backend.incremental:
false
-
# jobmanager故障恢复策略,指定作业计算如何从任务失败中恢复 full重新启动所有任务以恢复作业 region重新启动可能受任务失败影响的所有任务,region在目前(flink1.11)只对批处理有效,实时计算任然时full
-
jobmanager.execution.failover-strategy: region
-
# 全局检查点的保留数量
-
state.checkpoints.num-retained: 3
-
# 本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖keyed state backends。当前,MemoryStateBackend不支持本地恢复。
-
state.backend.local-recovery:
true
-
# 存储基于文件的状态以进行本地恢复的根目录。本地恢复当前仅涵盖keyed state backends
-
taskmanager.state.local.root-dirs: /opt/flink-tm-state
修改完所有配置后,注意将配置分发到其他机器上。
5.5.2 运行/停止
这介绍Session的两种运行模式,根据自己的实际业务场景使用:
- 客户端模式
对于客户端模式而言,可以启动多个yarn session,一个yarn session模式对应一个JobManager,并按照需求提交作业,同一个Session中可以提交多个Flink作业。
- 分离模式
只能启动一个yarn-sission,如果启动多个,后面的session会一直处于等待,同一个yarn-session中可以提交多个Flink作业,通过-d参数指定,表示即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。
$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024 -d
启动yarn-session前可以先了解一下参数的含义,查看参数说明
$ bin/yarn-session.sh -h
启动yarn-session
$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024
启动yarn-session后,检查进程:
-
[zihao@zookeeper110
~]$ xsh jps
-
-----------zookeeper110-----------
-
7252
QuorumPeerMain
-
7958
Jps
-
-----------zookeeper111-----------
-
7714
Jps
-
7130
QuorumPeerMain
-
-----------zookeeper112-----------
-
7747
Jps
-
7116
QuorumPeerMain
-
-----------hadoop100-----------
-
8162
DataNode
-
8549
DFSZKFailoverController
-
8758
NodeManager
-
8375
JournalNode
-
14167
FlinkYarnSessionCli
-
14375
Jps
-
8031
NameNode
-
-----------hadoop101-----------
-
8002
DataNode
-
8163
JournalNode
-
8259
DFSZKFailoverController
-
7910
NameNode
-
8838
NodeManager
-
18536
YarnSessionClusterEntrypoint
-
18633
Jps
-
8701
ResourceManager
-
-----------hadoop102-----------
-
7666
ResourceManager
-
7523
JournalNode
-
10405
Jps
-
7769
NodeManager
-
7434
DataNode
YarnSessionClusterEntrypoint可以理解为Flink在Yarn上启动的ApplicationMaster,其内部就运行着三各组件:Dispatcher、ResourceManager和 JobManager。它们是在同一个jvm进程中。
停止Yarn-session
hadoop路径查看yarn上运行的任务
$ bin/yarn application –list
停止对应Application-Id对应的任务
$ bin/yarn application -kill application_1598166743428_0001
向Yarn上提交作业
Yarn-session启动后,界面上会提示一个地址(主机和端口),这个地址就是JobManager(也是ApplicationMaster)的地址。
在提交job的时候,可以在yarn-session启动的机器上直接提交,或者在其他机器上通过-m指定jobmanager的地址进行提交:
$ bin/flink run -m hadoop101:39998 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-example
注:笔者这里运行的是wordcount示例,将之前HDFS上已有的数据进行了wordcount分析,并将分析结果输出到HDFS上。
5.5.3 查看运行情况
启动yarn-session后:
提交作业,作业开始运行:
作业运行完成:
在HDFS中查看运行结果:
第6章 高可用演示
在Flink on Yarn模式中,Session方式的高可用是针对JobManager(ApplicationMaster)的高可用,而session模式中启动yarn-session后,该进程常驻在Yarn中,其高可用依赖于Yarn自身的资源管理机制,在JobManager(ApplicationMaster)挂掉之后,Yarn会在集群中重启该进程。
Per-Job方式,内部复用的是standalone JobManager 进程的HA,本章就不再介绍了,详见笔者上一篇文章:大数据实操篇 No.10-Flink Standalone集群HA高可用部署
6.1 启动yarn-session
先按前几章介绍的,把Zookeeper、HDFS、Yarn、Flink全部启动起来。并启动yarn-session。
此时,我们查看以下进程,发现YarnSessionClusterEntrypoint这个进程启动再hadoop100这台机器上:
-
[zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps
-
-----------zookeeper110-----------
-
7142 QuorumPeerMain
-
7192 Jps
-
-----------zookeeper111-----------
-
7156 Jps
-
7103 QuorumPeerMain
-
-----------zookeeper112-----------
-
7133 Jps
-
7087 QuorumPeerMain
-
-----------hadoop100-----------
-
7872 DFSZKFailoverController
-
7652 JournalNode
-
8260 FlinkYarnSessionCli
-
7285 NameNode
-
8006 NodeManager
-
8630 YarnSessionClusterEntrypoint
-
8712 Jps
-
7402 DataNode
-
-----------hadoop101-----------
-
7217 NameNode
-
8017 NodeManager
-
8356 Jps
-
7579 JournalNode
-
7867 ResourceManager
-
7293 DataNode
-
7869 DFSZKFailoverController
-
-----------hadoop102-----------
-
7795 Jps
-
7319 ResourceManager
-
7209 JournalNode
-
7099 DataNode
-
7439 NodeManager
6.2 查看Flink Web UI
通过Yarn集群管理界面打开Flink Web UI:
6.3 Kill掉ApplicationMaster
Kill掉YarnSessionClusterEntrypoint(ApplicationMaster)进程,此时Flink Web UI界面会短暂的不可用,Yarn正在尝试重新启动ApplicationMaster。重试次数由Yarn配置中的yarn.resourcemanager.am.max-attempts和Flink配置中的yarn.application-attempts决定。
稍等片刻后我们再重新再yarn集群管理界面打开Flink Web UI:
在JobManager的log中我们也能发现这次leader选举的日志信息:
6.4 查看恢复后的进程
再重新查看进程,发现YarnSessionClusterEntrypoint已经重新在hadoop102上启动了:
-
[zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps
-
-----------zookeeper110-----------
-
7237 Jps
-
7142 QuorumPeerMain
-
-----------zookeeper111-----------
-
7188 Jps
-
7103 QuorumPeerMain
-
-----------zookeeper112-----------
-
7165 Jps
-
7087 QuorumPeerMain
-
-----------hadoop100-----------
-
7872 DFSZKFailoverController
-
7652 JournalNode
-
8260 FlinkYarnSessionCli
-
7285 NameNode
-
8006 NodeManager
-
7402 DataNode
-
8875 Jps
-
-----------hadoop101-----------
-
7217 NameNode
-
8017 NodeManager
-
7579 JournalNode
-
7867 ResourceManager
-
7293 DataNode
-
7869 DFSZKFailoverController
-
8447 Jps
-
-----------hadoop102-----------
-
8084 YarnSessionClusterEntrypoint
-
7319 ResourceManager
-
8151 Jps
-
7209 JournalNode
-
7099 DataNode
-
7439 NodeManager
这里就演示完了Flink on Yarn Session模式下,JobManager(ApplicaionMaster)的高可用。
至此,Flink on Yarn集群高可用模式的部署及使用就完成了。 后续的章节我们将开始进入实时计算整条链路的实战环节!
最后推荐一个课程给大家,也是笔者在学习Flink的过程中发现的一个优质课程《Apache Flink 知其然,知其所以然》,Apache Flink PMC主讲,欢迎大家扫描二维码,加入钉钉群进行学习交流:
转载:https://blog.csdn.net/dzh284616172/article/details/108296551