原理说明
Elasticsearch 是一个实时的分布式的可扩展的使用REST接口的搜索引擎,允许进行全文、结构化搜索,它通常用于索引和搜索大量日志数据,也可用于搜索许多不同类型的文档。它基于Apache Lucene (TM)的开源搜索引擎,Lucene 非常复杂,你需要深入的了解检索相关知识来理解它是如何工作的。Elasticsearch 也是使用 Java 编写并使用 Lucene 来建立索引并实现搜索功能。有一个简单的与MySQL对应关系,可以理解为:
Kibana 是 Elasticsearch 的一个功能强大的开源的分析和可视化Dashboard,你可以使用它来搜索,查看存储在 ES 索引中的数据并与之交互。可以很容易实现高级的数据分析和可视化,以图标的形式展现出来,因此Kibana属于web框架程序,熟练使用是基于安装、自定义索引,搜索,控制台调用es的api和可视化等操作,特别需要注意的是,控制台可以非常方便的来调用es的api,强烈推荐使用。类似phpadmin操作mysql。
Fluentd是一个流行的开源数据收集器,我们将在 Kubernetes 集群节点上安装 Fluentd,通过获取容器日志文件、过滤和转换日志数据,然后将数据传递到 Elasticsearch 集群,在该集群中对其进行索引和存储。
架构设计
我们先使用StatufulSet来配置启动一个可扩展的 Elasticsearch 集群,然后在 Kubernetes 集群中创建一个 Kibana 应用,最后通过 DaemonSet 来运行 Fluentd,以便它在每个 Kubernetes 工作节点上都可以运行一个 Pod。
1:配置ES集群 2: 配置kibana应用,启用一个web可视化页面 3:启用多个Fluentd收集器 |
部署ES
创建命名空间
#生成命名空间 apiVersion: v1 kind: Namespace metadata: name: logging |
定义ES无头服务
#因为StatefulSet控制器与此服务关联; kind: Service apiVersion: v1 metadata: name: elasticsearch namespace: logging labels: app: elasticsearch spec: selector: #没有selector标签选择器称为外部服务,因为没有真实的pod对应,比如mysql app: elasticsearch type: NodePort #clusterIP常规内部集群服务,NodePort是常规外部集群服务,暴露在每个节点上,都有vip; clusterIP: None #如何写是指定;如果不写是随机;如果None称为无头服务,比如redis/es集群 ports: - port: 9200 name: rest - port: 9300 name: inter-node |
定义ES无状态POD
#通过 StatefulSet 来创建具体的 Elasticsearch 的 Pod 应用 apiVersion: apps/v1 kind: StatefulSet metadata: name: es namespace: logging spec: serviceName: elasticsearch replicas: 3 selector: matchLabels: app: elasticsearch template: metadata: labels: app: elasticsearch spec: nodeSelector: es: log initContainers: - name: increase-vm-max-map image: busybox command: ["sysctl", "-w", "vm.max_map_count=262144"] securityContext: privileged: true - name: increase-fd-ulimit image: busybox command: ["sh", "-c", "ulimit -n 65536"] securityContext: privileged: true containers: - name: elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:7.6.2 ports: - name: rest containerPort: 9200 - name: inter containerPort: 9300 resources: limits: cpu: 1000m requests: cpu: 1000m volumeMounts: - name: data mountPath: /usr/share/elasticsearch/data env: - name: cluster.name value: k8s-logs - name: node.name valueFrom: fieldRef: fieldPath: metadata.name - name: cluster.initial_master_nodes value: "es-0,es-1,es-2" - name: discovery.zen.minimum_master_nodes value: "2" - name: discovery.seed_hosts value: "elasticsearch" - name: ES_JAVA_OPTS value: "-Xms512m -Xmx512m" - name: network.host value: "0.0.0.0" volumeClaimTemplates: - metadata: name: data labels: app: elasticsearch spec: accessModes: [ "ReadWriteOnce" ] storageClassName: rook-ceph-block resources: requests: storage: 50Gi 验证:kubectl port-forward es-0 9200:9200 --namespace=logging |
开始部署Kibana
定义kibana服务
apiVersion: v1 kind: Service metadata: name: kibana namespace: logging labels: app: kibana spec: ports: - port: 5601 type: NodePort #必须映射到外部来,供外部使用 selector: app: kibana |
定义真实POD
#创建一个kibana.yaml文件 apiVersion: apps/v1 kind: Deployment metadata: name: kibana namespace: logging labels: app: kibana spec: selector: matchLabels: app: kibana template: metadata: labels: app: kibana spec: nodeSelector: es: log containers: - name: kibana image: docker.elastic.co/kibana/kibana:7.6.2 resources: limits: cpu: 1000m requests: cpu: 1000m env: - name: ELASTICSEARCH_HOSTS value: http://elasticsearch:9200 ports: - containerPort: 5601 部署这个应用 $ kubectl create -f kibana.yaml service/kibana created deployment.apps/kibana created |
开始部署Fluentd
概念介绍
Fluentd 是一个高效的日志聚合器,是用 Ruby 编写的,并且可以很好地扩展。对于大部分企业来说,Fluentd 足够高效并且消耗的资源相对较少,另外一个工具 Fluent-bit更轻量级,占用资源更少,但是插件相对 Fluentd 来说不够丰富,所以整体来说,Fluentd 更加成熟,使用更加广泛,所以我们这里也同样使用 Fluentd 来作为日志收集工具。
Fluentd 通过一组给定的数据源抓取日志数据,处理后(转换成结构化的数据格式)将它们转发给其他服务,比如 Elasticsearch、对象存储等等。Fluentd 支持超过300个日志存储和分析服务,所以在这方面是非常灵活的。主要运行步骤如下:
定义配置文件
# 配置文件 kind: ConfigMap apiVersion: v1 metadata: name: fluentd-config namespace: logging data: system.conf: |- <system> root_dir /tmp/fluentd-buffers/ </system> containers.input.conf: |- <source> @id fluentd-containers.log @type tail # Fluentd 内置的输入方式,其原理是不停地从源文件中获取新的日志 path /var/log/containers/*.log # 挂载的服务器Docker容器日志地址 pos_file /var/log/es-containers.log.pos tag raw.kubernetes.* # 设置日志标签 read_from_head true <parse> # 多行格式化成JSON @type multi_format # 使用 multi-format-parser 解析器插件 <pattern> format json # JSON解析器 time_key time # 指定事件时间的时间字段 time_format %Y-%m-%dT%H:%M:%S.%NZ # 时间格式 </pattern> <pattern> format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/ time_format %Y-%m-%dT%H:%M:%S.%N%:z </pattern> </parse> </source> # 在日志输出中检测异常,并将其作为一条日志转发 # https://github.com/GoogleCloudPlatform/fluent-plugin-detect-exceptions <match raw.kubernetes.**> # 匹配tag为raw.kubernetes.**日志信息 @id raw.kubernetes @type detect_exceptions # 使用detect-exceptions插件处理异常栈信息 remove_tag_prefix raw # 移除 raw 前缀 message log stream stream multiline_flush_interval 5 max_bytes 500000 max_lines 1000 </match> <filter **> # 拼接日志 @id filter_concat @type concat # Fluentd Filter 插件,用于连接多个事件中分隔的多行日志。 key message multiline_end_regexp /\n$/ # 以换行符“\n”拼接 separator "" </filter> # 添加 Kubernetes metadata 数据 <filter kubernetes.**> @id filter_kubernetes_metadata @type kubernetes_metadata </filter> # 修复 ES 中的 JSON 字段 # 插件地址:https://github.com/repeatedly/fluent-plugin-multi-format-parser <filter kubernetes.**> @id filter_parser @type parser # multi-format-parser多格式解析器插件 key_name log # 在要解析的记录中指定字段名称。 reserve_data true # 在解析结果中保留原始键值对。 remove_key_name_field true # key_name 解析成功后删除字段。 <parse> @type multi_format <pattern> format json </pattern> <pattern> format none </pattern> </parse> </filter> # 删除一些多余的属性 <filter kubernetes.**> @type record_transformer remove_keys $.docker.container_id,$.kubernetes.container_image_id,$.kubernetes.pod_id,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels.pod-template-hash </filter> # 只保留具有logging=true标签的Pod日志 <filter kubernetes.**> @id filter_log @type grep <regexp> key $.kubernetes.labels.logging pattern ^true$ </regexp> </filter> ###### 监听配置,一般用于日志聚合用 ###### forward.input.conf: |- # 监听通过TCP发送的消息 <source> @id forward @type forward </source> output.conf: |- <match **> @id elasticsearch @type elasticsearch @log_level info include_tag_key true host elasticsearch port 9200 logstash_format true logstash_prefix k8s # 设置 index 前缀为 k8s request_timeout 30s <buffer> @type file path /var/log/fluentd-buffers/kubernetes.system.buffer flush_mode interval retry_type exponential_backoff flush_thread_count 2 flush_interval 5s retry_forever retry_max_interval 30 chunk_limit_size 2M queue_limit_length 8 overflow_action block </buffer> </match> 上面配置文件中我们只配置了 docker 容器日志目录,收集到数据经过处理后发送到 elasticsearch:9200 服务。 |
定义真实POD
#生成自用的公钥私钥key,创建一个apiserver-csr.json apiVersion: v1 kind: ServiceAccount metadata: name: fluentd-es namespace: logging labels: k8s-app: fluentd-es kubernetes.io/cluster-service: "true" addonmanager.kubernetes.io/mode: Reconcile --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: fluentd-es labels: k8s-app: fluentd-es kubernetes.io/cluster-service: "true" addonmanager.kubernetes.io/mode: Reconcile rules: - apiGroups: - "" resources: - "namespaces" - "pods" verbs: - "get" - "watch" - "list" --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: fluentd-es labels: k8s-app: fluentd-es kubernetes.io/cluster-service: "true" addonmanager.kubernetes.io/mode: Reconcile subjects: - kind: ServiceAccount name: fluentd-es namespace: logging apiGroup: "" roleRef: kind: ClusterRole name: fluentd-es apiGroup: "" --- apiVersion: apps/v1 kind: DaemonSet metadata: name: fluentd-es namespace: logging labels: k8s-app: fluentd-es kubernetes.io/cluster-service: "true" addonmanager.kubernetes.io/mode: Reconcile spec: selector: matchLabels: k8s-app: fluentd-es template: metadata: labels: k8s-app: fluentd-es kubernetes.io/cluster-service: "true" # 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案 annotations: scheduler.alpha.kubernetes.io/critical-pod: '' spec: serviceAccountName: fluentd-es containers: - name: fluentd-es image: quay.io/fluentd_elasticsearch/fluentd:v3.0.1 env: - name: FLUENTD_ARGS value: --no-supervisor -q resources: limits: memory: 500Mi requests: cpu: 100m memory: 200Mi volumeMounts: - name: varlog mountPath: /var/log - name: varlibdockercontainers mountPath: /data/docker/containers readOnly: true - name: config-volume mountPath: /etc/fluent/config.d nodeSelector: beta.kubernetes.io/fluentd-ds-ready: "true" tolerations: - operator: Exists terminationGracePeriodSeconds: 30 volumes: - name: varlog hostPath: path: /var/log - name: varlibdockercontainers hostPath: path: /data/docker/containers - name: config-volume configMap: name: fluentd-config $ kubectl create -f fluentd-configmap.yaml configmap "fluentd-config" created $ kubectl create -f fluentd-daemonset.yaml serviceaccount "fluentd-es" created clusterrole.rbac.authorization.k8s.io "fluentd-es" created clusterrolebinding.rbac.authorization.k8s.io "fluentd-es" created daemonset.apps "fluentd-es" created 将上面创建的 fluentd-config 这个 ConfigMap 对象通过 volumes 挂载到了 Fluentd 容器中,另外为了能够灵活控制哪些节点的日志可以被收集,所以我们这里还添加了一个 nodSelector 属性 kubectl label nodes node名beta.kubernetes.io/fluentd-ds-ready=true |
开始部署beat
概念介绍
Beats在是一个轻量级日志采集器,其实Beats家族有6个成员,包括filebeat。
Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash进行索引。它的工作方式是:启动Filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。所找到的每个日志,Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容,并将新日志数据发送到libbeat,libbeat将聚集事件,并将聚集的数据发送到Filebeat配置的输出。
filebeat结构:由两个组件构成,分别是inputs(输入)和harvesters(收集器)
harvester负责读取单个文件的内容。harvester逐行读取每个文件,并将内容发送到输出。为每个文件启动一个harvester。harvester负责打开和关闭文件,这意味着文件描述符在harvester运行时保持打开状态。如果在收集文件时删除或重命名文件,Filebeat将继续读取该文件。这样做的副作用是,磁盘上的空间一直保留到harvester关闭。默认情况下,Filebeat保持文件打开,直到达到close_inactive。
定义配置文件
# 配置文件 --- apiVersion: v1 kind: ConfigMap metadata: name: filebeat-config namespace: logging labels: k8s-app: filebeat data: filebeat.yml: |- filebeat.config: #inputs: # type: log # enabled: true # paths: # - "/var/log/containers/*.log" # Reload inputs configs as they change: # reload.enabled: false # symlinks: true #modules: # path: ${path.config}/modules.d/*.yml # Reload module configs as they change: # reload.enabled: false filebeat.autodiscover: # 使用filebeat自动发现的方式 providers: - type: kubernetes templates: - condition: equals: kubernetes.namespace: "test-taxi" # 收集prod命名空间的日志 config: - type: container # 日志类型为log而非docker或者container,因为我们输出的日志非json格式。 containers.ids: - "*" paths: - "/var/log/containers/${data.kubernetes.container.id}/*.log" encoding: utf-8 scan_frequency: 10s # 扫描新文件的时间间隔,默认为10秒 tail_files: true fields_under_root: true # 设置为true后,fields存储在输出文档的顶级位置 fields: index: "test-info" processors: - add_kubernetes_metadata: in_cluster: true - drop_fields: #删除的多余字段 fields: ["host", "tags", "ecs", "log", "prospector", "agent", "input", "beat", "offset"] ignore_missing: true - type: kubernetes templates: - condition: equals: kubernetes.namespace: "test-taxi" config: - type: container containers.ids: - "*" paths: - "/var/log/containers/${data.kubernetes.container.id}/*.log" encoding: utf-8 scan_frequency: 10s tail_files: true fields_under_root: true fields: index: "prod-error" multiline.type: pattern multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:' multiline.negate: false multiline.match: after processors: - add_kubernetes_metadata: in_cluster: true - drop_fields: fields: ["host", "tags", "ecs", "log", "prospector", "agent", "input", "beat", "offset"] ignore_missing: true setup.ilm.enabled: false output.elasticsearch: hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}'] indices: - index: "k8s-test-info-%{+yyyy.MM.dd}" when.contains: fields: index: "test-info" - index: "k8s-prod-error-%{+yyyy.MM.dd}" when.contains: fields: index: "prod-error" |
定义真实POD
# 配置文件 --- apiVersion: extensions/v1beta1 kind: DaemonSet metadata: name: filebeat namespace: efk labels: k8s-app: filebeat spec: template: metadata: labels: k8s-app: filebeat spec: serviceAccountName: filebeat terminationGracePeriodSeconds: 30 containers: - name: filebeat image: elastic/filebeat:7.10.1 args: [ "-c", "/etc/filebeat.yml", "-e", "-d", "*", ] env: - name: ELASTICSEARCH_HOST value: es - name: ELASTICSEARCH_PORT value: "9200" - name: ELASTICSEARCH_USERNAME value: elastic - name: ELASTICSEARCH_PASSWORD value: elastic - name: ELASTIC_CLOUD_ID value: - name: ELASTIC_CLOUD_AUTH value: securityContext: runAsUser: 0 # If using Red Hat OpenShift uncomment this: #privileged: true resources: limits: memory: 200Mi requests: cpu: 100m memory: 100Mi volumeMounts: - name: config mountPath: /etc/filebeat.yml readOnly: true subPath: filebeat.yml - name: inputs mountPath: /usr/share/filebeat/inputs.d readOnly: true - name: data mountPath: /usr/share/filebeat/data - name: varlibdockercontainers mountPath: /var/log/containers readOnly: true volumes: - name: config configMap: defaultMode: 0600 name: filebeat-config - name: varlibdockercontainers hostPath: path: /data/docker/data/containers - name: inputs configMap: defaultMode: 0600 name: filebeat-inputs - name: data hostPath: path: /var/lib/filebeat-data type: DirectoryOrCreate --- apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: name: filebeat subjects: - kind: ServiceAccount name: filebeat namespace: efk roleRef: kind: ClusterRole name: filebeat apiGroup: rbac.authorization.k8s.io --- apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: name: filebeat labels: k8s-app: filebeat rules: - apiGroups: [""] # "" indicates the core API group resources: - namespaces - pods - containers verbs: - get - watch - list --- apiVersion: v1 kind: ServiceAccount metadata: name: filebeat namespace: efk labels: k8s-app: filebeat --- |
支持的命令
export #导出
run #执行(默认执行)
test #测试配置
keystore #秘钥存储
modules #模块配置管理
setup #设置初始环境
配置文件说明
# 配置文件 type: log #input类型为log enable: true #表示是该log类型配置生效 paths: #指定要监控的日志,目前按照Go语言的glob函数处理。没有对配置目录做递归处理,比如配置的如果是: - /var/log/* /*.log #则只会去/var/log目录的所有子目录中寻找以".log"结尾的文件,而不会寻找/var/log目录下以".log"结尾的文件。 recursive_glob.enabled: #启用全局递归模式,例如/foo/**包括/foo, /foo/*, /foo/*/* encoding:#指定被监控的文件的编码类型,使用plain和utf-8都是可以处理中文日志的 exclude_lines: ['^DBG'] #不包含匹配正则的行 include_lines: ['^ERR', '^WARN'] #包含匹配正则的行 harvester_buffer_size: 16384 #每个harvester在获取文件时使用的缓冲区的字节大小 max_bytes: 10485760 #单个日志消息可以拥有的最大字节数。max_bytes之后的所有字节都被丢弃而不发送。默认值为10MB (10485760) exclude_files: ['\.gz$'] #用于匹配希望Filebeat忽略的文件的正则表达式列表 ingore_older: 0 #默认为0,表示禁用,可以配置2h,2m等,注意ignore_older必须大于close_inactive的值.表示忽略超过设置值未更新的 文件或者文件从来没有被harvester收集 close_* #close_ *配置选项用于在特定标准或时间之后关闭harvester。 关闭harvester意味着关闭文件处理程序。 如果在harvester关闭 后文件被更新,则在scan_frequency过后,文件将被重新拾取。 但是,如果在harvester关闭时移动或删除文件,Filebeat将无法再次接收文件 ,并且harvester未读取的任何数据都将丢失。 close_inactive #启动选项时,如果在制定时间没有被读取,将关闭文件句柄 读取的最后一条日志定义为下一次读取的起始点,而不是基于文件的修改时间 如果关闭的文件发生变化,一个新的harverster将在scan_frequency运行后被启动 建议至少设置一个大于读取日志频率的值,配置多个prospector来实现针对不同更新速度的日志文件 使用内部时间戳机制,来反映记录日志的读取,每次读取到最后一行日志时开始倒计时使用2h 5m 来表示 close_rename #当选项启动,如果文件被重命名和移动,filebeat关闭文件的处理读取 close_removed #当选项启动,文件被删除时,filebeat关闭文件的处理读取这个选项启动后,必须启动clean_removed close_eof #适合只写一次日志的文件,然后filebeat关闭文件的处理读取 close_timeout #当选项启动时,filebeat会给每个harvester设置预定义时间,不管这个文件是否被读取,达到设定时间后,将被关闭 close_timeout 不能等于ignore_older,会导致文件更新时,不会被读取如果output一直没有输出日志事件,这个timeout是不会被启动的, 至少要要有一个事件发送,然后haverter将被关闭 设置0 表示不启动 clean_inactived #从注册表文件中删除先前收获的文件的状态 设置必须大于ignore_older+scan_frequency,以确保在文件仍在收集时没有删除任何状态 配置选项有助于减小注册表文件的大小,特别是如果每天都生成大量的新文件 此配置选项也可用于防止在Linux上重用inode的Filebeat问题 clean_removed #启动选项后,如果文件在磁盘上找不到,将从注册表中清除filebeat 如果关闭close removed 必须关闭clean removed scan_frequency #prospector检查指定用于收获的路径中的新文件的频率,默认10s tail_files:#如果设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件作为一个事件依次发送, 而不是从文件开始处重新发送所有内容。 symlinks:#符号链接选项允许Filebeat除常规文件外,可以收集符号链接。收集符号链接时,即使报告了符号链接的路径, Filebeat也会打开并读取原始文件。 backoff: #backoff选项指定Filebeat如何积极地抓取新文件进行更新。默认1s,backoff选项定义Filebeat在达到EOF之后 再次检查文件之间等待的时间。 max_backoff: #在达到EOF之后再次检查文件之前Filebeat等待的最长时间 backoff_factor: #指定backoff尝试等待时间几次,默认是2 harvester_limit:#harvester_limit选项限制一个prospector并行启动的harvester数量,直接影响文件打开数 tags #列表中添加标签,用过过滤,例如:tags: ["json"] fields #可选字段,选择额外的字段进行输出可以是标量值,元组,字典等嵌套类型 默认在sub-dictionary位置 filebeat.inputs: fields: app_id: query_engine_12 fields_under_root #如果值为ture,那么fields存储在输出文档的顶级位置 multiline.pattern #必须匹配的regexp模式 multiline.negate #定义上面的模式匹配条件的动作是 否定的,默认是false 假如模式匹配条件'^b',默认是false模式,表示讲按照模式匹配进行匹配 将不是以b开头的日志行进行合并 如果是true,表示将不以b开头的日志行进行合并 multiline.match # 指定Filebeat如何将匹配行组合成事件,在之前或者之后,取决于上面所指定的negate multiline.max_lines #可以组合成一个事件的最大行数,超过将丢弃,默认500 multiline.timeout #定义超时时间,如果开始一个新的事件在超时时间内没有发现匹配,也将发送日志,默认是5smax_procs #设置可以同时执行的最大CPU数。默认值为系统中可用的逻辑CPU的数量。name #为该filebeat指定名字,默认为主机的hostname |