0
点赞
收藏
分享

微信扫一扫

Flink Metrics Reporter源码分析(含详细实践操作)



文章目录

  • 01 引言
  • 02 案例
  • 2.1 环境准备
  • 2.2 安装Prometheus组件套
  • 2.3 安装PushGateway
  • 2.4 flink配置指标发送器
  • 03 Flink Metrics源码分析
  • 04 文末


01 引言

文档地址:https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters

通过阅读Flink的官方文档,可以知道flink将各项运行时指标发送给外部系统,目前支持的外部系统包括如下:

外部系统

描述

JMX

当Flink将指标推送到JMX时,它将指标注册为MBean,并暴露给MBean服务器,外部系统通过JMX连接器定期从MBean服务器Pull这些指标数据,用于监控和管理Flink应用程序。

Graphite

Graphite服务器是一个开源的软件,用于实时监控和度量大规模分布式系统的性能,Flink应用程序会通过Graphite报告器将生成的指标数据Push到Graphite服务器,然后用户可以通过Graphite的Web界面方便地查看和分析这些数据,以监控和优化应用程序的性能

InfluxDB

InfluxDB是一个时间序列数据库,专门用于存储和查询时间相关的数据,比如指标、事件等,Flink通过InfluxDB连接器将生成的指标数据推送到InfluxDB中

Prometheus

Prometheus通过Flink暴露的HTTP端点直接Pull指标数据,用于监控和度量Flink应用程序的性能和状态

PrometheusPushGateway

Flink应用程序定期将指标数据推送到Prometheus PushGateway,然后Prometheus从PushGateway Pull这些指标数据,以实现对应用程序性能的实时监控和度量。对于Push类型的工具,我们可以在Flink配置文件中设定它们的报告间隔

StatsD

StatsD是一个网络守护进程,允许Flink应用程序将指标数据通过UDP协议发送到StatsD服务器,用于度量和监控应用程序的性能

Datadog

Datadog是一个监控和日志记录服务,允许Flink应用程序将指标数据推送到Datadog平台,以实现对应用程序性能的监控和分析

Slf4j

Slf4j是一个简单日志门面,Flink可以通过Slf4j记录指标数据到日志文件中用于监控和分析

得益于Flink的设计,我们只需要引入相应的jar包,并在flink-conf.yaml做简单的配置,程序启动时,jar包便会被自动加载为 plugins,无需做其他配置,开箱即用

当然我们也可以自定义指标发送器,只需要实现 org.apache.flink.metrics.reporter.MetricReporter 接口,并实现 Scheduled 接口让发送器周期性地将运行时指标发送出去

接下来讲解下简单的使用,以典型的Flink将指标推送至PrometheusPushGateway为例子,最后再分析源码。

02 案例

Flink推送指标给Push Gateway的流程图大致如下:

Flink Metrics Reporter源码分析(含详细实践操作)_flink


流程很简单,其实就是把指标推送给PushGateway,然后Prometheus定时从PushGateway获取指标,然后在Grafana可以直接通过promql去查询相关的Flink指标做图形化显示,并可以配置告警规则,到了一定的告警阈值,便可以通过Alerts manager发送到多个消息渠道。

2.1 环境准备

准备的物料清单如下:

环境

描述

k8s

Prometheus监控组件套,是在k8s安装的,

Prometheus相关k8s资源

相关资源有Prometheus、AlertManager、Grafana、Prometheus Operator等

flink 1.18.0 安装包

使用的是Flink1.18.0的安装包做演示

2.2 安装Prometheus组件套

因为这里不是本文的重点,所以这里就不再深入,默认用户已经安装好了

为了方便演示,我是直接安装了kubesphere了,因为里面自带了 Prometheus组件套,安装方式可以参考:《在 Kubernetes 上最小化安装 KubeSphere》。

安装成功之后,在dashboard可以看到在kubesphere-monitoring-system空间还帮我们安装好了很多的exporter了:

Flink Metrics Reporter源码分析(含详细实践操作)_源码分析_02

暴露端口后,可以在Prometheus控制台看到很多的exporter已经被Pormetheus检测到了,并从exporter里拉取数据:

Flink Metrics Reporter源码分析(含详细实践操作)_大数据_03


本文需要演示的是Flink指标推送到PushGateway,但是kubesphere是没有安装PushGateway的,所以还需要安装PushGateway,并能让Pormetheus Operator自动监控并更新到Pormetheus Server。

2.3 安装PushGateway

安装Prometheus PushGateway的资源文件如下:

kind: Deployment
apiVersion: apps/v1
metadata:
  name: kube-pushgateway
  namespace: kubesphere-monitoring-system
  labels:
    app.kubernetes.io/component: kube-pushgateway
    app.kubernetes.io/name: kube-pushgateway
    app.kubernetes.io/part-of: kube-prometheus
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/component: kube-pushgateway
      app.kubernetes.io/name: kube-pushgateway
      app.kubernetes.io/part-of: kube-prometheus
  template:
    metadata:
      labels:
        app.kubernetes.io/component: kube-pushgateway
        app.kubernetes.io/name: kube-pushgateway
        app.kubernetes.io/part-of: kube-prometheus
    spec:
      containers:
        - name: pushgateway
          image: prom/pushgateway
          ports:
            - name: web
              containerPort: 9091
              protocol: TCP

---

kind: Service
apiVersion: v1
metadata:
  name: kube-pushgateway
  namespace: kubesphere-monitoring-system
  labels:
    app.kubernetes.io/component: kube-pushgateway
    app.kubernetes.io/name: kube-pushgateway
    app.kubernetes.io/part-of: kube-prometheus
spec:
  ports:
    - name: web
      protocol: TCP
      port: 9091
      targetPort: web
      nodePort: 32039
  selector:
    app.kubernetes.io/component: kube-pushgateway
    app.kubernetes.io/name: kube-pushgateway
    app.kubernetes.io/part-of: kube-prometheus
  type: NodePort

使用kubectl命令apply之后,可以看到创建了相关的pod以及service,而service暴露了端口为:32039。

那么Pormetheus是如何监控到PushGateway的呢?是否还需要对Prometheus进行配置?其实很简单,我们只需要再定义一个ServiceMonitor就可以了,原理图如下:

Flink Metrics Reporter源码分析(含详细实践操作)_监控告警_04


首先ServiceMonitor负责指定应如何监控Kubenetes Service,可以理解为关联Service开放的端口,并从这个端口获取指标。

Operator负责监听ServiceMonitor这些自定义资源的变化,并且根据这些资源的定义自动化的完成,如:Prometheus Server自身以及配置的自动化管理工作。是整个系统的控制中心(大脑)。

因此,我们配置ServiceMonitor,并使用kubectl命令去apply:

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  labels:
    app.kubernetes.io/name: kube-pushgateway
    app.kubernetes.io/part-of: kube-prometheus
    app.kubernetes.io/vendor: kubesphere
  name: kube-pushgateway
  namespace: kubesphere-monitoring-system
spec:
  endpoints:
    - interval: 5s
      port: web
      scheme: http
  jobLabel: app.kubernetes.io/name
  selector:
    matchLabels:
      app.kubernetes.io/component: kube-pushgateway
      app.kubernetes.io/name: kube-pushgateway
      app.kubernetes.io/part-of: kube-prometheus

创建成功之后,可以在Pormetheus的控制台看到PushGateway成功的被Prometheus监听到了:

Flink Metrics Reporter源码分析(含详细实践操作)_flink_05

自此,Prometheus监控套及PushGateway已经安装好了。对监控告警有兴趣的同学,可以参考我之前的专栏:《监控告警》

那么Flink如何推送指标到PushGateway呢?接下来继续讲解。

2.4 flink配置指标发送器

首先下载flink安装包,我下载的是flink1.18,下载地址:https://flink.apache.org/downloads/

Flink Metrics Reporter源码分析(含详细实践操作)_源码分析_06


解压,可以发现在"/flink-1.18.0/plugins"目录已经打包好了指标相关的插件JAR包,所以就无需引入了,使用内置的:

Flink Metrics Reporter源码分析(含详细实践操作)_实时计算_07


接着配置/flink-1.18.0/conf/flink-conf.yaml文件,末尾添加:

# 指定工厂类
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
# 指定PushGateway的主机地址
metrics.reporter.promgateway.host: localhost  
# 指定PushGateway的端口号 
metrics.reporter.promgateway.port: 32039  
# 设置PushGateway报告的作业名称    
metrics.reporter.promgateway.jobName: flink-metrics 
# 指定是否为PushGateway报告的作业名称添加随机后缀以防止命名冲突
metrics.reporter.promgateway.randomJobNameSuffix: true
# 指定Flink应用程序关闭时是否从PushGateway中删除作业度量信息
metrics.reporter.promgateway.deleteOnShutdown: false
# 指定推送指标收集间隔
metrics.reporter.promgateway.interval: 10 SECONDS

启动flink集群:

./flink-1.18.0/bin/start-cluster.sh

启动成功后,可以从日志看到flink已经成功加载了插件:

Flink Metrics Reporter源码分析(含详细实践操作)_实时计算_08


访问PushGateway的页面,也可以看到接收到来自flink发送的指标了。

Flink Metrics Reporter源码分析(含详细实践操作)_实时计算_09

那么继续使用Grafana监控指标,我们可以直接从Grafana的模板仓库导入模板:https://grafana.com/grafana/dashboards/?search=flink

Flink Metrics Reporter源码分析(含详细实践操作)_flink_10


找一个自己喜欢的模板,配置grafana,并关联prometheus数据源,在页面就可以监控Flink的指标了:

Flink Metrics Reporter源码分析(含详细实践操作)_实时计算_11

03 Flink Metrics源码分析

从github clone源码,并切换到release-1.18分支,在项目的目录可以看到flink指标相关的源码:

Flink Metrics Reporter源码分析(含详细实践操作)_监控告警_12


进入本文说的PushGateway方式的目录:

Flink Metrics Reporter源码分析(含详细实践操作)_flink_13

可以看到上图的3个类为核心的类:

  • PrometheusPushGatewayReporterOptions:对应的flink-conf.yaml里面的配置;
  • PrometheusPushGatewayReporterFactory:程序启动时,使用SPI的方式加载该工厂,并初始化Reporter;
  • PrometheusPushGatewayReporter:主要做上报指标的操作;

代码其实是很清晰的,截图如下:

Flink Metrics Reporter源码分析(含详细实践操作)_实时计算_14


PrometheusPushGatewayReporterFactory工厂如下:

Flink Metrics Reporter源码分析(含详细实践操作)_源码分析_15


主要是Reporter根据配置的时间,定时触发:

Flink Metrics Reporter源码分析(含详细实践操作)_源码分析_16

ok,就是这么简单,我们也可以扩展很多的exporter的,只要实现简单的工厂,并创建对应的Scheduled的逻辑即可。

那么report在哪里触发呢?经过查询,是程序启动时,加载完Reporter插件之后,继续初始化org.apache.flink.runtime.metrics.MetricRegistryImpl类的时候初始化了一个定时器:

Flink Metrics Reporter源码分析(含详细实践操作)_大数据_17


定时器里面的ReporterTask执行了report方法:

Flink Metrics Reporter源码分析(含详细实践操作)_大数据_18

那么上报的指标有哪些呢?细心的我们会发现PrometheusPushGatewayReporter还继承了AbstractPrometheusReporter基类,这个类里面已经写了一些添加指标的操作:

Flink Metrics Reporter源码分析(含详细实践操作)_源码分析_19


可以看到,指标类型分为上图的几种,如果用简单的话整理,即有如下分类:

  • GAUGE(仪表):表示一个单一的数值,可增可减,常用于表示一些瞬时状态,如当前的队列长度、连接数等。
  • COUNTER(计数器):表示一个单调递增的数值,常用于度量事件发生的次数,例如请求数、错误数等。
  • METER(计量器):类似于计数器,但可以测量每秒发生的事件数(速率),常用于度量吞吐量。
  • HISTOGRAM(直方图):用于度量数据的分布情况,通常会将数据分成若干个桶(buckets),并记录每个桶中数据的个数或总和。

04 文末

本文主要讲解了Flink Metrics Reporter相关的使用实践与部分源码分析,如果想深入Flink的指标含义,可以参考官方文档:https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/,本文不在细述。希望能帮助到大家,谢谢的大家的阅读,本文完!


举报

相关推荐

0 条评论