【Flink on k8s】Flink on Kubernetes 部署模式

阅读 73

2021-09-21

1.Flink on Kubernetes 的背景

Flink 选择 Kubernetes 的主要原因是结合 Flink 和 Kubernetes 的长稳性
Flink 特性:提供的实时服务是需要长时间、稳定地运行,常应用于电信网络质量监控、实时风控、实时推荐等稳定性要求较高的场景;
Kubernetes 优势为应用提供了部署、管理能力,同时保证其稳定运行。Kubernetes 具有很好的生态,可以集成各种运维工具,例如 prometheus、主流日志采集工具等。Kubernetes 具有很好的扩缩容机制,可以大大提高资源利用率。

2.Flink Session 和 Application 模式

2.1 Session 模式

Session 模式简介

预先构建 Flink 集群,且该集群长期处于运行状态,但不能自动扩缩容。用户通过 client 提交作业到运行中的 JobManager,而 JobManager 将任务分配到运行中的 TaskManager。

优点:

Flink 集群是预先启动运行的。用户提交作业的时候,作业可以立即分配到 TaskManager,即作业启动速度快

缺点:

资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反正 TaskManager 资源不足。
作业隔离性差,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启。

部署指导

参考:Flink on Standalone Kubernetes Reference
集群配置
集群配置通过 configmap 挂载到容器中
flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.memory.process.size: 1024m
  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

Deployment 文件
Flink 镜像上传到私有镜像仓。编辑 jobmanager-service.yaml、jobmanager-deployment.yaml、taskmanager-deployment.yaml

jobmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

taskmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager

执行 yaml
通过 kubectl create -f 命令创建 Flink 集群

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml

2.2 Application 模式

Application 模式简介

每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。

优点:

一个作业独占一个集群,作业的隔离性好

缺点:

资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之 TaskManager 资源不足。同时,JobManager 不能复用。

3.Flink Native Session 模式

3.1 Native Session 模式简介

类似 Session 模式,需要预先构建 JobManager。不同点是用户通过 Flink Client 向 JobManager 提交作业后,根据作业需要的 Slot 数量,JobManager 直接向 Kubernetes 申请 TaskManager 资源,最后把作业提交到 TaskManager 上。

3.2 优缺点分析

优点:

TaskManager 的资源是实时的、按需进行的创建,对资源的利用率更高

缺点:

作业真正运行起来的时间较长,因为需要等待 TaskManager 创建。

3.3 部署指导

参考:Native Kubernetes - Session Mode
集群配置
集群配置通过 configmap 挂载到容器中,如上 2.1 所示。
新增如下配置:
flink-configuration-configmap.yaml

kubernetes.cluster-id: my-first-flink-cluster
execution.attached: true

② 配置 jobmanager-deployment.yaml
如上 2.1 所示,需要把启动脚本修改为 ./bin/kubernetes-session.sh

jobmanager-deployment.yaml

// 忽略...

    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/kubernetes-session.sh;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]

// 忽略...

执行 yaml
通过 kubectl create -f 命令创建 Flink 集群

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml

4.Flink Native Application 模式

4.1 Native Application 模式简介

类似 Application 模式,每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。不同点是 Native 特性,即 Flink 直接与 Kubernetes 进行通信并按需申请资源,无需用户指定 TaskManager 资源的数量。

4.2 优缺点分析

优点:

① 一个作业独占一个集群,作业的隔离性好。
资源利用率相对较高,按需申请 JobManager 和 TaskManager。

缺点:

① 一个作业独占一个集群,JobManager 不能复用
作业启动较慢,在作业提交后,才开始创建 JobManager 和 TaskManager。

5.Flink 运行模式总结

模式 隔离性 作业启动时间 资源利用率 资源按需创建
Session 弱,作业共享集群 较短,立即启动 较低,集群长期存在
Application 强,作业独享集群 最长,等待集群创建完成 一般,作业结束后释放资源
Native Session 弱,作业共享集群 一般,等待 TaskManager 创建 较低,TaskManager 按需申请
Native Application 强,作业独占集群 一般, 等待集群创建完成 最好,集群按需创建

精彩评论(0)

0 0 举报