0
点赞
收藏
分享

微信扫一扫

【Flink on k8s】Flink on Kubernetes 部署模式

树下的老石头 2021-09-21 阅读 49

1.Flink on Kubernetes 的背景

Flink 选择 Kubernetes 的主要原因是结合 Flink 和 Kubernetes 的长稳性
Flink 特性:提供的实时服务是需要长时间、稳定地运行,常应用于电信网络质量监控、实时风控、实时推荐等稳定性要求较高的场景;
Kubernetes 优势为应用提供了部署、管理能力,同时保证其稳定运行。Kubernetes 具有很好的生态,可以集成各种运维工具,例如 prometheus、主流日志采集工具等。Kubernetes 具有很好的扩缩容机制,可以大大提高资源利用率。

2.Flink Session 和 Application 模式

2.1 Session 模式

Session 模式简介

预先构建 Flink 集群,且该集群长期处于运行状态,但不能自动扩缩容。用户通过 client 提交作业到运行中的 JobManager,而 JobManager 将任务分配到运行中的 TaskManager。

优点:

Flink 集群是预先启动运行的。用户提交作业的时候,作业可以立即分配到 TaskManager,即作业启动速度快

缺点:

资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反正 TaskManager 资源不足。
作业隔离性差,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启。

部署指导

参考:Flink on Standalone Kubernetes Reference
集群配置
集群配置通过 configmap 挂载到容器中
flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.memory.process.size: 1024m
  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

Deployment 文件
Flink 镜像上传到私有镜像仓。编辑 jobmanager-service.yaml、jobmanager-deployment.yaml、taskmanager-deployment.yaml

jobmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

taskmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager

执行 yaml
通过 kubectl create -f 命令创建 Flink 集群

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml

2.2 Application 模式

Application 模式简介

每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。

优点:

一个作业独占一个集群,作业的隔离性好

缺点:

资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之 TaskManager 资源不足。同时,JobManager 不能复用。

3.Flink Native Session 模式

3.1 Native Session 模式简介

类似 Session 模式,需要预先构建 JobManager。不同点是用户通过 Flink Client 向 JobManager 提交作业后,根据作业需要的 Slot 数量,JobManager 直接向 Kubernetes 申请 TaskManager 资源,最后把作业提交到 TaskManager 上。

3.2 优缺点分析

优点:

TaskManager 的资源是实时的、按需进行的创建,对资源的利用率更高

缺点:

作业真正运行起来的时间较长,因为需要等待 TaskManager 创建。

3.3 部署指导

参考:Native Kubernetes - Session Mode
集群配置
集群配置通过 configmap 挂载到容器中,如上 2.1 所示。
新增如下配置:
flink-configuration-configmap.yaml

kubernetes.cluster-id: my-first-flink-cluster
execution.attached: true

② 配置 jobmanager-deployment.yaml
如上 2.1 所示,需要把启动脚本修改为 ./bin/kubernetes-session.sh

jobmanager-deployment.yaml

// 忽略...

    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/kubernetes-session.sh;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]

// 忽略...

执行 yaml
通过 kubectl create -f 命令创建 Flink 集群

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml

4.Flink Native Application 模式

4.1 Native Application 模式简介

类似 Application 模式,每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。不同点是 Native 特性,即 Flink 直接与 Kubernetes 进行通信并按需申请资源,无需用户指定 TaskManager 资源的数量。

4.2 优缺点分析

优点:

① 一个作业独占一个集群,作业的隔离性好。
资源利用率相对较高,按需申请 JobManager 和 TaskManager。

缺点:

① 一个作业独占一个集群,JobManager 不能复用
作业启动较慢,在作业提交后,才开始创建 JobManager 和 TaskManager。

5.Flink 运行模式总结

模式 隔离性 作业启动时间 资源利用率 资源按需创建
Session 弱,作业共享集群 较短,立即启动 较低,集群长期存在
Application 强,作业独享集群 最长,等待集群创建完成 一般,作业结束后释放资源
Native Session 弱,作业共享集群 一般,等待 TaskManager 创建 较低,TaskManager 按需申请
Native Application 强,作业独占集群 一般, 等待集群创建完成 最好,集群按需创建
举报

相关推荐

0 条评论