介绍
Flink on Yarn的HA高可用模式,首先依赖于Yarn自身的高可用机制(ResourceManager高可用),并通过Yarn对JobManager进行管理,当JobManager失效时,Yarn将重新启动JobManager。其次Flink Job在恢复时,需要依赖Checkpoint进行恢复,而Checkpoint的快照依赖于远端的存储:HDFS,所以HDFS也必须是高可用,同时JobManager的元数据信息也依赖于HDFS的高可用(namenode的高可用,和多副本机制),再者JobManager元数据的指针信息要依赖于Zookeeper的高可用。
注意:对于未启动高可用之前启动job 需要在配置完高可用后重启job
配置
1)修改cdh yarn 中配置 设置application master重启时,尝试的最大次数。

2)修改flink 配置文件 flink-conf.yaml
#flink on yarn下 per-job模式 高可用
#高可用模式
high-availabilityzookeeper
# JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
high-availability.storageDirhdfs///flink/ha/
#
# # Zookeeper集群 修改自己的集群
high-availability.zookeeper.quorum192.168.22.2412181,192.168.22.2412182,192.168.22.2412183
#
# # 在zookeeper下的根目录
high-availability.zookeeper.path.root/flink_yarn
#
# # zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID,官方建议这个配置最好去掉,因
为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。
high-availability.cluster-id/default_yarn
#
# # 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
yarn.application-attempts6
#
#如果 ZooKeeper 在 Kerberos 的安全模式下运行
#
## default is "zookeeper". If the ZooKeeper quorum is configured
## with a different service name then it can be supplied here.
#
#zookeeper.sasl.service-name: zookeeper
#
## default is "Client". The value needs to match one of the values
## configured in "security.kerberos.login.contexts".
#zookeeper.sasl.login-context-name: Client测试
当前 flink on yarn 下per job 测试
1)启动job:
/opt/cloudera/parcels/FLINK/lib/flink/bin/flink run \
-m yarn-cluster -yjm 2048 -ytm 4096 \
-c com.flink.jobs.customer.gift.newCustomerGift -p 4 -ys 4 \
-ynm newCustomerGift \
/home/ln/newCustomerGift-1.0-jar-with-dependencies.jar \
     --setCK  false \
     --inputKafkaCluster  192.168.22.241:9091,192.168.22.241:9092,192.168.22.241:9093 \
     --inputKafkaGroup  newCustomerGift \
     --inputKafkaTopic  WeChatFreeze \
     --saveAnaData  false \
     --parseOutMysqlIp  "" \
     --parseOutMysqlPort  "" \
     --parseOutMysqlUser  "" \
     --parseOutMysqlPasswd  "" \
     --parseOutMysqlDB  "" \
     --parseOutMysqlTB  "" \
     --logicTrdCd  PUSH_WX_001 \
     --getCardInfomationRedisBrokerList  192.168.22.241:6379,192.168.22.241:6380,192.168.22.241:6381 \
     --getCardInfomationRedisKeyName  databus-activate-xkl-cardInfomation \
     --checkLastNewCustomerGiftRedisBrokerList  192.168.22.241:6379,192.168.22.241:6380,192.168.22.241:6381 \
     --checkLastNewCustomerGiftRedisKeyName  databus-activate-xkl-lastNewCustomerGift \
     --newCustomerGiftResOutMysqlIp  127.0.0.1 \
     --newCustomerGiftResOutMysqlPort  3306 \
     --newCustomerGiftResOutMysqlUser  root \
     --newCustomerGiftResOutMysqlPasswd  wangxin@1 \
     --newCustomerGiftResOutMysqlDB  databus_activate \
     --newCustomerGiftResOutMysqlTB  xkl_res_stream_out \
     --newCustomerGiftHisBatchMysqlTB xkl_batch \
     --newCustomerGiftResOutKafkaCluster  192.168.22.241:9091,192.168.22.241:9092,192.168.22.241:9093 \
     --newCustomerGiftResOutKafkaTopic  databus_activate_xkl_output \
     --collectErrorMysqlTab xkl_error_data \
     --activeCardStreamKafkaTopic  databus_activate_kjh_output \
     --printLog false \
     --jobName  newCustomerGift2)查看
代表leader 选择完毕

3)jps 获取YarnJobClusterEntrypoint 的进程
kill -9 进程3)这时再访问job的web 代表再重启

成功










