文章目录
- 01 引言
- 02 案例
- 2.1 环境准备
- 2.2 安装Prometheus组件套
- 2.3 安装PushGateway
- 2.4 flink配置指标发送器
- 03 Flink Metrics源码分析
- 04 文末
01 引言
文档地址:https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters
通过阅读Flink的官方文档,可以知道flink将各项运行时指标发送给外部系统,目前支持的外部系统包括如下:
外部系统 | 描述 |
JMX | 当Flink将指标推送到JMX时,它将指标注册为MBean,并暴露给MBean服务器,外部系统通过JMX连接器定期从MBean服务器Pull这些指标数据,用于监控和管理Flink应用程序。 |
Graphite | Graphite服务器是一个开源的软件,用于实时监控和度量大规模分布式系统的性能,Flink应用程序会通过Graphite报告器将生成的指标数据Push到Graphite服务器,然后用户可以通过Graphite的Web界面方便地查看和分析这些数据,以监控和优化应用程序的性能 |
InfluxDB | InfluxDB是一个时间序列数据库,专门用于存储和查询时间相关的数据,比如指标、事件等,Flink通过InfluxDB连接器将生成的指标数据推送到InfluxDB中 |
Prometheus | Prometheus通过Flink暴露的HTTP端点直接Pull指标数据,用于监控和度量Flink应用程序的性能和状态 |
PrometheusPushGateway | Flink应用程序定期将指标数据推送到Prometheus PushGateway,然后Prometheus从PushGateway Pull这些指标数据,以实现对应用程序性能的实时监控和度量。对于Push类型的工具,我们可以在Flink配置文件中设定它们的报告间隔 |
StatsD | StatsD是一个网络守护进程,允许Flink应用程序将指标数据通过UDP协议发送到StatsD服务器,用于度量和监控应用程序的性能 |
Datadog | Datadog是一个监控和日志记录服务,允许Flink应用程序将指标数据推送到Datadog平台,以实现对应用程序性能的监控和分析 |
Slf4j | Slf4j是一个简单日志门面,Flink可以通过Slf4j记录指标数据到日志文件中用于监控和分析 |
得益于Flink
的设计,我们只需要引入相应的jar
包,并在flink-conf.yaml
做简单的配置,程序启动时,jar包便会被自动加载为 plugins,无需做其他配置,开箱即用。
当然我们也可以自定义指标发送器,只需要实现 org.apache.flink.metrics.reporter.MetricReporter
接口,并实现 Scheduled
接口让发送器周期性地将运行时指标发送出去。
接下来讲解下简单的使用,以典型的Flink将指标推送至PrometheusPushGateway为例子,最后再分析源码。
02 案例
Flink推送指标给Push Gateway的流程图大致如下:
流程很简单,其实就是把指标推送给PushGateway,然后Prometheus定时从PushGateway获取指标,然后在Grafana可以直接通过promql去查询相关的Flink指标做图形化显示,并可以配置告警规则,到了一定的告警阈值,便可以通过Alerts manager发送到多个消息渠道。
2.1 环境准备
准备的物料清单如下:
环境 | 描述 |
k8s | Prometheus监控组件套,是在k8s安装的, |
Prometheus相关k8s资源 | 相关资源有Prometheus、AlertManager、Grafana、Prometheus Operator等 |
flink 1.18.0 安装包 | 使用的是Flink1.18.0的安装包做演示 |
2.2 安装Prometheus组件套
因为这里不是本文的重点,所以这里就不再深入,默认用户已经安装好了
为了方便演示,我是直接安装了kubesphere
了,因为里面自带了 Prometheus
组件套,安装方式可以参考:《在 Kubernetes 上最小化安装 KubeSphere》。
安装成功之后,在dashboard
可以看到在kubesphere-monitoring-system
空间还帮我们安装好了很多的exporter了:
暴露端口后,可以在Prometheus控制台看到很多的exporter已经被Pormetheus检测到了,并从exporter里拉取数据:
本文需要演示的是Flink指标推送到PushGateway,但是kubesphere是没有安装PushGateway的,所以还需要安装PushGateway,并能让Pormetheus Operator自动监控并更新到Pormetheus Server。
2.3 安装PushGateway
安装Prometheus PushGateway的资源文件如下:
kind: Deployment
apiVersion: apps/v1
metadata:
name: kube-pushgateway
namespace: kubesphere-monitoring-system
labels:
app.kubernetes.io/component: kube-pushgateway
app.kubernetes.io/name: kube-pushgateway
app.kubernetes.io/part-of: kube-prometheus
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/component: kube-pushgateway
app.kubernetes.io/name: kube-pushgateway
app.kubernetes.io/part-of: kube-prometheus
template:
metadata:
labels:
app.kubernetes.io/component: kube-pushgateway
app.kubernetes.io/name: kube-pushgateway
app.kubernetes.io/part-of: kube-prometheus
spec:
containers:
- name: pushgateway
image: prom/pushgateway
ports:
- name: web
containerPort: 9091
protocol: TCP
---
kind: Service
apiVersion: v1
metadata:
name: kube-pushgateway
namespace: kubesphere-monitoring-system
labels:
app.kubernetes.io/component: kube-pushgateway
app.kubernetes.io/name: kube-pushgateway
app.kubernetes.io/part-of: kube-prometheus
spec:
ports:
- name: web
protocol: TCP
port: 9091
targetPort: web
nodePort: 32039
selector:
app.kubernetes.io/component: kube-pushgateway
app.kubernetes.io/name: kube-pushgateway
app.kubernetes.io/part-of: kube-prometheus
type: NodePort
使用kubectl命令apply之后,可以看到创建了相关的pod以及service,而service暴露了端口为:32039。
那么Pormetheus是如何监控到PushGateway的呢?是否还需要对Prometheus进行配置?其实很简单,我们只需要再定义一个ServiceMonitor就可以了,原理图如下:
首先ServiceMonitor负责指定应如何监控Kubenetes Service,可以理解为关联Service开放的端口,并从这个端口获取指标。
Operator负责监听ServiceMonitor这些自定义资源的变化,并且根据这些资源的定义自动化的完成,如:Prometheus Server自身以及配置的自动化管理工作。是整个系统的控制中心(大脑)。
因此,我们配置ServiceMonitor,并使用kubectl命令去apply:
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
app.kubernetes.io/name: kube-pushgateway
app.kubernetes.io/part-of: kube-prometheus
app.kubernetes.io/vendor: kubesphere
name: kube-pushgateway
namespace: kubesphere-monitoring-system
spec:
endpoints:
- interval: 5s
port: web
scheme: http
jobLabel: app.kubernetes.io/name
selector:
matchLabels:
app.kubernetes.io/component: kube-pushgateway
app.kubernetes.io/name: kube-pushgateway
app.kubernetes.io/part-of: kube-prometheus
创建成功之后,可以在Pormetheus的控制台看到PushGateway成功的被Prometheus监听到了:
自此,Prometheus监控套及PushGateway已经安装好了。对监控告警有兴趣的同学,可以参考我之前的专栏:《监控告警》
那么Flink如何推送指标到PushGateway呢?接下来继续讲解。
2.4 flink配置指标发送器
首先下载flink安装包,我下载的是flink1.18,下载地址:https://flink.apache.org/downloads/
解压,可以发现在"/flink-1.18.0/plugins
"目录已经打包好了指标相关的插件JAR包,所以就无需引入了,使用内置的:
接着配置/flink-1.18.0/conf/flink-conf.yaml
文件,末尾添加:
# 指定工厂类
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
# 指定PushGateway的主机地址
metrics.reporter.promgateway.host: localhost
# 指定PushGateway的端口号
metrics.reporter.promgateway.port: 32039
# 设置PushGateway报告的作业名称
metrics.reporter.promgateway.jobName: flink-metrics
# 指定是否为PushGateway报告的作业名称添加随机后缀以防止命名冲突
metrics.reporter.promgateway.randomJobNameSuffix: true
# 指定Flink应用程序关闭时是否从PushGateway中删除作业度量信息
metrics.reporter.promgateway.deleteOnShutdown: false
# 指定推送指标收集间隔
metrics.reporter.promgateway.interval: 10 SECONDS
启动flink集群:
./flink-1.18.0/bin/start-cluster.sh
启动成功后,可以从日志看到flink已经成功加载了插件:
访问PushGateway的页面,也可以看到接收到来自flink发送的指标了。
那么继续使用Grafana监控指标,我们可以直接从Grafana的模板仓库导入模板:https://grafana.com/grafana/dashboards/?search=flink
找一个自己喜欢的模板,配置grafana,并关联prometheus数据源,在页面就可以监控Flink的指标了:
03 Flink Metrics源码分析
从github clone源码,并切换到release-1.18
分支,在项目的目录可以看到flink指标相关的源码:
进入本文说的PushGateway方式的目录:
可以看到上图的3个类为核心的类:
- PrometheusPushGatewayReporterOptions:对应的flink-conf.yaml里面的配置;
- PrometheusPushGatewayReporterFactory:程序启动时,使用SPI的方式加载该工厂,并初始化Reporter;
- PrometheusPushGatewayReporter:主要做上报指标的操作;
代码其实是很清晰的,截图如下:
PrometheusPushGatewayReporterFactory工厂如下:
主要是Reporter根据配置的时间,定时触发:
ok,就是这么简单,我们也可以扩展很多的exporter的,只要实现简单的工厂,并创建对应的Scheduled的逻辑即可。
那么report在哪里触发呢?经过查询,是程序启动时,加载完Reporter插件之后,继续初始化org.apache.flink.runtime.metrics.MetricRegistryImpl
类的时候初始化了一个定时器:
定时器里面的ReporterTask执行了report方法:
那么上报的指标有哪些呢?细心的我们会发现PrometheusPushGatewayReporter还继承了AbstractPrometheusReporter基类,这个类里面已经写了一些添加指标的操作:
可以看到,指标类型分为上图的几种,如果用简单的话整理,即有如下分类:
- GAUGE(仪表):表示一个单一的数值,可增可减,常用于表示一些瞬时状态,如当前的队列长度、连接数等。
- COUNTER(计数器):表示一个单调递增的数值,常用于度量事件发生的次数,例如请求数、错误数等。
- METER(计量器):类似于计数器,但可以测量每秒发生的事件数(速率),常用于度量吞吐量。
- HISTOGRAM(直方图):用于度量数据的分布情况,通常会将数据分成若干个桶(buckets),并记录每个桶中数据的个数或总和。
04 文末
本文主要讲解了Flink Metrics Reporter相关的使用实践与部分源码分析,如果想深入Flink的指标含义,可以参考官方文档:https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/,本文不在细述。希望能帮助到大家,谢谢的大家的阅读,本文完!