飞道的博客

大数据实操篇 No.11-Flink on Yarn集群HA高可用部署及使用

636人阅读  评论(0)

第1章 简介

1.1 概要介绍

Flink on Yarn的HA高可用模式,首先依赖于Yarn自身的高可用机制(ResourceManager高可用),并通过Yarn对JobManager进行管理,当JobManager失效时,Yarn将重新启动JobManager。其次Flink Job在恢复时,需要依赖Checkpoint进行恢复,而Checkpoint的快照依赖于远端的存储:HDFS,所以HDFS也必须是高可用,同时JobManager的元数据信息也依赖于HDFS的高可用(namenode的高可用,和多副本机制),再者JobManager元数据的指针信息要依赖于Zookeeper的高可用。本章重点介绍Flink本身的高可用,其他框架的高可用请参考笔者之前的文章。

1.2 Flink on Yarn的优势

相对于 Standalone 模式,在Yarn 模式下有以下几点好处

1.资源按需使用,提高集群的资源利用率;

2.任务有优先级,根据优先级运行作业;

3.基于 Yarn 调度系统,能够自动化地处理各个角色的 Failover:

      JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控;

      如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器;

      如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager。

 

第2章 Flink on Yarn模式运行的方式

2.1 Per-Job

Per-Job模式:简答的说就是直接run job,每次提交的任务Yarn都会分配一个JobManager,执行完之后整个资源会释放,包括JobManager和TaskManager。

Per-Job模式适合比较大的任务、执行时间比较长的任务。

2.2 Session

Session模式:在Session模式中, Dispatcher 和 ResourceManager 是可以复用的;当执行完Job之后JobManager并不会释放,Session 模式也称为多线程模式,其特点是资源会一直存在不会释放。使用时先启动yarn-session,然后再提交job,每次提交job,也都会分配一个JobManager。
Session模式适合比较小的任务、执行时间比较短的任务。该模式不用频繁的申请资源和释放资源。

注:本章先简单的了解这两种模式和使用上的区别,笔者会在后续的章节中对其原理进行较为详细的剖析。敬请期待!

 

第3章 集群规划

笔者是在原有的3台机器名为Hadoop10*的机器上安装的Flink,所以这里机器名都是Hadoop100、Hadoop101、Hadoop102。 

 

Hadoop100(Flink)

Hadoop101(Flink)

Hadoop102(Flink)

JobManager

TaskManager

 

第4章 下载安装

Flink官网:https://flink.apache.org/

大家到官网进行下载,先参考我之前的文章进行环境变量的配置:大数据实操篇 No.9-Flink Standalone模式部署及使用

Flink on Yarn部署使用前一定要先安装好Zookeeper和Hadoop(HDFS和Yarn)。

 

第5章 部署和使用

5.1 修改Hadoop配置

5.1.1 修改yarn-site.xml

修改hadoop配置文件/etc/hadoop/yarn-site.xml,设置application master重启时,尝试的最大次数。


  
  1. <property>
  2.     <name>yarn.resourcemanager.am.max-attempts </name>
  3.     <value>10 </value>
  4.     <description>
  5.     The maximum number of application master execution attempts.
  6.     </description>
  7. </property>

5.2 修改masters

 修改conf目录下masters文件:


  
  1. hadoop100: 8081
  2. hadoop101: 8081
  3. hadoop102: 8081

5.3 修改workers

 修改conf目录下workers文件:


  
  1. hadoop100
  2. hadoop101
  3. hadoop102

接下来,笔者用2种模式单独进行部署演示。

5.4 Per-Job模式

5.4.1 修改flink-conf.yaml

修改flink配置文件/conf/flink-conf.yaml


  
  1. # jobmanager和taskmanager、其他client的RPC通信IP地址,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
  2. jobmanager.rpc.address: localhost
  3. # jobmanager和taskmanager、其他client的RPC通信端口,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
  4. jobmanager.rpc.port: 6123
  5. # jobmanager JVM heap 内存大小
  6. jobmanager.memory.process.size: 1024m
  7. # taskmanager JVM heap 内存大小
  8. taskmanager.memory.process.size: 1024m
  9. # 每个taskmanager提供的任务slots数量
  10. # 并行度等于TM 数量乘以每个TM 的Solts 数量 TM=并行度/Solts数量 如果slots数量大于8 则只会起一个TM
  11. taskmanager.numberOfTaskSlots: 3
  12. # 并行计算个数
  13. parallelism.default: 6
  14. # 高可用模式
  15. high-availability: zookeeper
  16. # JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
  17. high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/
  18. # Zookeeper集群
  19. high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181
  20. # 在zookeeper下的根目录
  21. high-availability.zookeeper.path.root: /flink_yarn
  22. # zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID,官方建议这个配置最好去掉,因为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。
  23. #high-availability.cluster-id: /default_yarn
  24. # 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
  25. yarn.application-attempts: 10
  26. #==============================================================================
  27. # Fault tolerance and checkpointing
  28. #==============================================================================
  29. # jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
  30. state.backend: rocksdb
  31. # 检查点的默认目录。Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
  32. state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints
  33. # 保存点的默认目录。由状态后端用于将保存点写入文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
  34. state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints
  35. # 是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
  36. state.backend.incremental: false
  37. # jobmanager故障恢复策略,指定作业计算如何从任务失败中恢复 full重新启动所有任务以恢复作业 region重新启动可能受任务失败影响的所有任务,region在目前(flink1.11)只对批处理有效,实时计算任然时full
  38. jobmanager.execution.failover-strategy: region
  39. # 全局检查点的保留数量
  40. state.checkpoints.num-retained: 3
  41. # 本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖keyed state backends。当前,MemoryStateBackend不支持本地恢复。
  42. state.backend.local-recovery: true
  43. # 存储基于文件的状态以进行本地恢复的根目录。本地恢复当前仅涵盖keyed state backends
  44. taskmanager.state.local.root-dirs: /opt/flink-tm-state

修改完所有配置后,注意将配置分发到其他机器上。

注意:Hadoop中的yarn.resourcemanager.am.max-attemps和Flink中的yarn.application-attempts配置含义:

flink job失败重启次数,尝试重新启动9次(9次重试+ 1次初始尝试)
flink job(在yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,flink中的yarn.application-attempts配置不能超过yarn中的yarn.resourcemanager.am.max-attemps。

5.4.2 运行作业

Flink各版本的运行命令可能存在差异,可以先通过-h查看帮助。

$ bin/flink run -h

运行Job

$ bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-job-example

 5.4.3 查看运行情况

资源已经受Yarn的管理,可以直接从Yarn集群的Web UI界面打开ApplicationMaster,从而进入到Flink的Web UI控制台:

运行中的作业: 

运行完成的作业: 

运行完成后,JobManager和TaskManager就被释放了,此时Yarn中显示的运行结果如下: 

Linux系统中显示的运行信息: 

5.5 Session模式

5.5.1 修改flink-conf.yaml

修改flink配置文件/conf/flink-conf.yaml


  
  1. # jobmanager和taskmanager、其他client的RPC通信IP地址,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
  2. jobmanager.rpc.address: localhost
  3. # jobmanager和taskmanager、其他client的RPC通信端口,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
  4. jobmanager.rpc.port: 6123
  5. # jobmanager JVM heap 内存大小
  6. jobmanager.memory.process.size: 1024m
  7. # taskmanager JVM heap 内存大小
  8. taskmanager.memory.process.size: 1024m
  9. # 每个taskmanager提供的任务slots数量
  10. taskmanager.numberOfTaskSlots: 3
  11. # 并行计算个数
  12. parallelism.default: 3
  13. # 高可用模式
  14. high-availability: zookeeper
  15. # JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
  16. high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/
  17. # Zookeeper集群
  18. high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181
  19. # 在zookeeper下的根目录
  20. high-availability.zookeeper.path.root: /flink_yarn
  21. # zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID,官方建议这个配置最好去掉,因为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。
  22. #high-availability.cluster-id: /default_yarn
  23. # 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
  24. yarn.application-attempts: 10
  25. #==============================================================================
  26. # Fault tolerance and checkpointing
  27. #==============================================================================
  28. # jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
  29. state.backend: rocksdb
  30. # 检查点的默认目录。Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
  31. state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints
  32. # 保存点的默认目录。由状态后端用于将保存点写入文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
  33. state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints
  34. # 是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
  35. state.backend.incremental: false
  36. # jobmanager故障恢复策略,指定作业计算如何从任务失败中恢复 full重新启动所有任务以恢复作业 region重新启动可能受任务失败影响的所有任务,region在目前(flink1.11)只对批处理有效,实时计算任然时full
  37. jobmanager.execution.failover-strategy: region
  38. # 全局检查点的保留数量
  39. state.checkpoints.num-retained: 3
  40. # 本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖keyed state backends。当前,MemoryStateBackend不支持本地恢复。
  41. state.backend.local-recovery: true
  42. # 存储基于文件的状态以进行本地恢复的根目录。本地恢复当前仅涵盖keyed state backends
  43. taskmanager.state.local.root-dirs: /opt/flink-tm-state

修改完所有配置后,注意将配置分发到其他机器上。 

5.5.2 运行/停止

这介绍Session的两种运行模式,根据自己的实际业务场景使用:

  • 客户端模式

对于客户端模式而言,可以启动多个yarn session,一个yarn session模式对应一个JobManager,并按照需求提交作业,同一个Session中可以提交多个Flink作业。

  • 分离模式

只能启动一个yarn-sission,如果启动多个,后面的session会一直处于等待,同一个yarn-session中可以提交多个Flink作业,通过-d参数指定,表示即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。

$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024 -d

启动yarn-session前可以先了解一下参数的含义,查看参数说明

$ bin/yarn-session.sh -h

启动yarn-session

$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024

启动yarn-session后,检查进程:


  
  1. [zihao@zookeeper110 ~]$ xsh jps
  2. -----------zookeeper110-----------
  3. 7252 QuorumPeerMain
  4. 7958 Jps
  5. -----------zookeeper111-----------
  6. 7714 Jps
  7. 7130 QuorumPeerMain
  8. -----------zookeeper112-----------
  9. 7747 Jps
  10. 7116 QuorumPeerMain
  11. -----------hadoop100-----------
  12. 8162 DataNode
  13. 8549 DFSZKFailoverController
  14. 8758 NodeManager
  15. 8375 JournalNode
  16. 14167 FlinkYarnSessionCli
  17. 14375 Jps
  18. 8031 NameNode
  19. -----------hadoop101-----------
  20. 8002 DataNode
  21. 8163 JournalNode
  22. 8259 DFSZKFailoverController
  23. 7910 NameNode
  24. 8838 NodeManager
  25. 18536 YarnSessionClusterEntrypoint
  26. 18633 Jps
  27. 8701 ResourceManager
  28. -----------hadoop102-----------
  29. 7666 ResourceManager
  30. 7523 JournalNode
  31. 10405 Jps
  32. 7769 NodeManager
  33. 7434 DataNode

YarnSessionClusterEntrypoint可以理解为Flink在Yarn上启动的ApplicationMaster,其内部就运行着三各组件:Dispatcher、ResourceManager和 JobManager。它们是在同一个jvm进程中。

停止Yarn-session
hadoop路径查看yarn上运行的任务

$ bin/yarn application –list

停止对应Application-Id对应的任务

$ bin/yarn application -kill application_1598166743428_0001

向Yarn上提交作业

Yarn-session启动后,界面上会提示一个地址(主机和端口),这个地址就是JobManager(也是ApplicationMaster)的地址。

 在提交job的时候,可以在yarn-session启动的机器上直接提交,或者在其他机器上通过-m指定jobmanager的地址进行提交:

$ bin/flink run -m hadoop101:39998 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-example

注:笔者这里运行的是wordcount示例,将之前HDFS上已有的数据进行了wordcount分析,并将分析结果输出到HDFS上。

5.5.3 查看运行情况

 启动yarn-session后:

 提交作业,作业开始运行:

 作业运行完成:

 

 在HDFS中查看运行结果:

 

第6章 高可用演示

在Flink on Yarn模式中,Session方式的高可用是针对JobManager(ApplicationMaster)的高可用,而session模式中启动yarn-session后,该进程常驻在Yarn中,其高可用依赖于Yarn自身的资源管理机制,在JobManager(ApplicationMaster)挂掉之后,Yarn会在集群中重启该进程。

Per-Job方式,内部复用的是standalone JobManager 进程的HA,本章就不再介绍了,详见笔者上一篇文章:大数据实操篇 No.10-Flink Standalone集群HA高可用部署

6.1 启动yarn-session

先按前几章介绍的,把Zookeeper、HDFS、Yarn、Flink全部启动起来。并启动yarn-session。

此时,我们查看以下进程,发现YarnSessionClusterEntrypoint这个进程启动再hadoop100这台机器上:


  
  1. [zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps
  2. -----------zookeeper110-----------
  3. 7142 QuorumPeerMain
  4. 7192 Jps
  5. -----------zookeeper111-----------
  6. 7156 Jps
  7. 7103 QuorumPeerMain
  8. -----------zookeeper112-----------
  9. 7133 Jps
  10. 7087 QuorumPeerMain
  11. -----------hadoop100-----------
  12. 7872 DFSZKFailoverController
  13. 7652 JournalNode
  14. 8260 FlinkYarnSessionCli
  15. 7285 NameNode
  16. 8006 NodeManager
  17. 8630 YarnSessionClusterEntrypoint
  18. 8712 Jps
  19. 7402 DataNode
  20. -----------hadoop101-----------
  21. 7217 NameNode
  22. 8017 NodeManager
  23. 8356 Jps
  24. 7579 JournalNode
  25. 7867 ResourceManager
  26. 7293 DataNode
  27. 7869 DFSZKFailoverController
  28. -----------hadoop102-----------
  29. 7795 Jps
  30. 7319 ResourceManager
  31. 7209 JournalNode
  32. 7099 DataNode
  33. 7439 NodeManager

6.2 查看Flink Web UI

通过Yarn集群管理界面打开Flink Web UI:

 

6.3 Kill掉ApplicationMaster 

Kill掉YarnSessionClusterEntrypoint(ApplicationMaster)进程,此时Flink Web UI界面会短暂的不可用,Yarn正在尝试重新启动ApplicationMaster。重试次数由Yarn配置中的yarn.resourcemanager.am.max-attempts和Flink配置中的yarn.application-attempts决定。

稍等片刻后我们再重新再yarn集群管理界面打开Flink Web UI:

 

在JobManager的log中我们也能发现这次leader选举的日志信息:

6.4 查看恢复后的进程

再重新查看进程,发现YarnSessionClusterEntrypoint已经重新在hadoop102上启动了:


  
  1. [zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps
  2. -----------zookeeper110-----------
  3. 7237 Jps
  4. 7142 QuorumPeerMain
  5. -----------zookeeper111-----------
  6. 7188 Jps
  7. 7103 QuorumPeerMain
  8. -----------zookeeper112-----------
  9. 7165 Jps
  10. 7087 QuorumPeerMain
  11. -----------hadoop100-----------
  12. 7872 DFSZKFailoverController
  13. 7652 JournalNode
  14. 8260 FlinkYarnSessionCli
  15. 7285 NameNode
  16. 8006 NodeManager
  17. 7402 DataNode
  18. 8875 Jps
  19. -----------hadoop101-----------
  20. 7217 NameNode
  21. 8017 NodeManager
  22. 7579 JournalNode
  23. 7867 ResourceManager
  24. 7293 DataNode
  25. 7869 DFSZKFailoverController
  26. 8447 Jps
  27. -----------hadoop102-----------
  28. 8084 YarnSessionClusterEntrypoint
  29. 7319 ResourceManager
  30. 8151 Jps
  31. 7209 JournalNode
  32. 7099 DataNode
  33. 7439 NodeManager

这里就演示完了Flink on Yarn Session模式下,JobManager(ApplicaionMaster)的高可用。

 

至此,Flink on Yarn集群高可用模式的部署及使用就完成了。 后续的章节我们将开始进入实时计算整条链路的实战环节!

最后推荐一个课程给大家,也是笔者在学习Flink的过程中发现的一个优质课程《Apache Flink 知其然,知其所以然》,Apache Flink PMC主讲,欢迎大家扫描二维码,加入钉钉群进行学习交流:


转载:https://blog.csdn.net/dzh284616172/article/details/108296551
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场