0
点赞
收藏
分享

微信扫一扫

flink on k8s 的实施部署测试

标签(空格分隔): kubernetes系列

一:flink on docker

1.1 flink的介绍

Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的
分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,
Flink的流水线运行时系统可以执行批处理和流处理程序。此外,
Flink的运行时本身也支持迭代算法的执行

image.png

二:docker 上面部署flink

2.1 安装docker

yum -y install wget jq psmisc vim net-tools nfs-utils telnet yum-utils device-mapper-persistent-data lvm2 git network-scripts tar curl -y


# 关闭交换分区
sed -ri 's/.*swap.*/#&/' /etc/fstab
swapoff -a && sysctl -w vm.swappiness=0

cat /etc/fstab
# /dev/mapper/centos-swap swap swap defaults 0 0
#

## 关闭 SeLinux
# setenforce 0
# sed -i s/SELINUX=enforcing/SELINUX=disabled/g /etc/selinux/config
# 修改内核参数

yum -y install bridge-utils
modprobe br_netfilter
vim /etc/sysctl.conf
-----
net.ipv4.ip_forward = 1
net.ipv6.conf.all.disable_ipv6 = 1
net.bridge.bridge-nf-call-arptables = 1
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
-----
sysctl -p

cat <<EOF >> /etc/security/limits.conf
* hard nofile 655360
* soft nofile 655360
* hard nproc 655360
* soft nproc 655360
* soft core 655360
* hard core 655360
root hard nofile 655360
root soft nofile 655360

EOF



###系统依赖包
yum install -y conntrack scoat ntpdate ntp ipvsadm ipset jq iptables curl sysstat libseccomp wget vim net-tools git

### 开启ipvs 转发
modprobe br_netfilter

cat > /etc/sysconfig/modules/ipvs.modules << EOF

#!/bin/bash
modprobe -- ip_vs
modprobe -- ip_vs_rr
modprobe -- ip_vs_wrr
modprobe -- ip_vs_sh
modprobe -- nf_conntrack
EOF


chmod 755 /etc/sysconfig/modules/ipvs.modules

bash /etc/sysconfig/modules/ipvs.modules

lsmod | grep -e ip_vs -e nf_conntrack
1) RPM 包安装方法:
提供的官方网址
官方文档:https://docs.docker.com

# 安装依赖包
yum install -y yum-utils device-mapper-persistent-data lvm2
# 添加Docker软件包源
yum-config-manager \
--add-repo \
http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

## 查看所有的可用版本
yum list docker-ce --showduplicates | sort -r

# 安装Docker CE
yum install -y docker-ce 【直接安装最新版本】

## 安装所需要的指定版本
#yum install docker-ce-19.03.15-3.el7 docker-ce-cli-19.03.15-3.el7
yum install docker-ce-20.10.23-3.el7 docker-ce-cli-20.10.23-3.el7 docker-ce-rootless-extras-20.10.23-3.el7

## 阿里云镜像加速器
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
registry-mirrors: [https://dfmo7maf.mirror.aliyuncs.com],
exec-opts: [native.cgroupdriver=systemd],
log-driver: json-file,
log-opts: {
max-size: 2048m
},
storage-driver: overlay2
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
sudo systemctl enable docker

## 添加镜像加速
curl -sSL https://get.daocloud.io/daotools/set_mirror.sh | sh -s http://f1361db2.m.daocloud.io

# 启动Docker服务并设置开机启动
systemctl start docker
systemctl enable docker

1.3 封装flink 的docker 镜像

解压flink 与jdk 的包
tar -zxvf flink-1.14.0-bin-scala_2.12.tgz
tar -zxvf jdk1.8.0_201.tar.gz

配置flink的配置文件
cd /root/flink-1.14.0/conf
vim flink-conf.yaml
----
#1. 配置jobmanager rpc 地址
jobmanager.rpc.address: 172.16.10.11

#2. 修改taskmanager内存大小,可改可不改
taskmanager.memory.process.size: 2048 m

#3. 修改一个taskmanager中对于的taskslot个数,可改可不改
taskmanager.numberOfTaskSlots: 4

#修改并行度,可改可不改
parallelism.default: 4


# 打开web 8081 允许 所有主机访问web

rest.port: 8081
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0

---

vim master
---
172.16.10.11:8081

---
vim worker
---
172.16.10.11

----

tar -zcvf flink.tar.gz flink-1.14.0

vim flink-dockerfile.yaml
--
# Dockerfile
[root@flyfish11]# vi flink-docker.yaml

FROM centos:7
MAINTAINER flyfish
ADD flink-1.14.0.tar.gz /root
ADD jdk1.8.0_201.tar.gz /root
ARG FLINK_VERSION=1.14.0
ARG SCALA_VERSION=2.12
ENV FLINK_HOME=/root/flink-1.14.0
ENV PATH=$FLINK_HOME/bin:$PATH
ENV JAVA_HOME=/root/jdk1.8.0_201
ENV PATH=$JAVA_HOME/bin:$PATH

RUN cd $FLINK_HOME \
&& echo env.java.home: /root/jdk1.8.0_201 >> $FLINK_HOME/conf/flink-conf.yaml \
&& echo FLINK_HOME=/root/flink-1.14.0 >> /etc/profile \
&& echo PATH=$FLINK_HOME/bin:$PATH >> /etc/profile \
&& echo JAVA_HOME=/root/jdk1.8.0_201 >> /etc/profile \
&& echo PATH=$JAVA_HOME/bin:$PATH >> /etc/profile \
&& source /etc/profile

EXPOSE 8081


--
打成镜像处理

docker pull centos:7


docker build -t flink:1.14.0 . -f flink-dockerfile.yaml

image.png image.png

启动flink的docker 

docker run -itd -p 9999:8081 --name flink-test flink:1.14.0 /bin/bash

image.png image.png image.png

运行一个测试任务:
./bin/flink run ./examples/batch/WordCount.jar

image.png

二:flink on k8s

2.1 flink on k8s 的 原理

Flink On K8s standalone 模式执行原理图如下:

image.png

kubectl apply -f flink-configuration-configmap.yaml
kubectl apply -f jobmanager-deployment.yaml
kubectl apply -f jobmanager-rest-service.yaml
kubectl apply -f jobmanager-service.yaml
kubectl apply -f taskmanager-deployment.yaml


kubectl get svc,pod

image.png image.png image.png image.png

安装jdk 

tar -zxvf jdk1.8.0_201.tar.gz
mv jdk1.8.0_201 /usr/local/jdk

vim /etc/profile
---
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin

---
source /etc/profile

java -version

image.png

./bin/flink run -m 172.16.10.11:31688 ./examples/batch/WordCount.jar

image.png image.png image.png image.png image.png

三:原生模式(native 模式)

原生(native)的意义

Flink 直接与 Kubernetes 进行通信并按需申请资源,无需用户指定 TaskManager 资源的数量
flink 内置k8s client,可以直接与k8s交互,不用像standalone模式一样去手动编排jobmanager
和taskmanager 配置文件

原生Session模式
与Standalone模式中的Session模式类似,还是分为两步,先启动一个集群,然后向集群提交任务。
可以通过运行kubernetes-session.sh文件来启动一个集群

./bin/kubernetes-session.sh

或者通过一些超参数来对集群进行设置
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-session \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000

然后在flink客户端,通过flink命令提交任务
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=flink-session examples/streaming/WindowJoin.jar



image.png

原生Session cluster的创建流程为:

1. Flink客户端先通过K8S的ApiServer提交cluster的描述信息,包括ConfigMap spec,
Job Manager Service spec, Job Manager Deployment spec等;
2. K8S接收到这些信息后就会拉取镜像、挂载卷轴来启动Flink master,这时候Dispatcher
与KubernetesResourceManager也会被启动,从而可以接受Flink job;

image.png

当用户通过Flink客户端提交一个job时,客户端就会生成这个job的job graph,
并与这个job的jar包一起提交到Dispatcher,然后就会生成这个job的JobMaster;
最后JobMaster会向KubernetesResourceManager申请slot来执行这个job graph,
如果集群中slot数量不够,KubernetesResourceManager会启动新的TaskManager pod
并将它注册到集群中。
原生Per-Job模式
目前尚处于实验阶段,在Flink 1.11版本中才支持。

官方的使用方式也是与前面Standalone-Per-Cluster模式类似,先创建一个包含用户jar的用于启动Flink Master的docker image,

然后在客户端通过flink命令根据该image提交任务,从而创建一个运行该任务的独立集群。


创建namespace、用户和赋权(RBAC)

# create namespace
kubectl create ns flink-native
# 设置命名空间首选项
kubectl config set-context --current --namespace=flink-native
# create serviceaccount
kubectl create serviceaccount flink
# 赋权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink-native

创建挂载目录
mkdir /opt/flink/conf

启动flink集群:
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink01\
-Dkubernetes.namespace=flink-native\
-Dkubernetes.service-account=flink \
-Dkubernetes.rest-service.exposed.type=ClusterIP \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.container-start-command-template=%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%

image.png image.png image.png

设置svc的模式为NodePort
kubectl edit svc flink-native-session-rest -n flink-native

image.png

提交任务测试:
./bin/flink run -m 172.16.10.11:32103 ./examples/batch/WordCount.jar

image.png image.png

举报

相关推荐

0 条评论