0
点赞
收藏
分享

微信扫一扫

日志收集系统方案对比&选型&实现

背景

我们这边应用部署的环境比较复杂,主要有以下几种:

  • 机器直接部署
  • 通过kubernates集群部署

部署环境不统一,导致查看应用日志很不方便。

业界日志系统架构

  • Collector的作用是:

    • 清洗、汇聚数据,减少对于后端集群的压力。
    • 安全,不允许Agent直连kafka等内部集群,保证一定的安全性,即使后端发生调整也能保证对于Agent连接、认证方式的稳定。
  • MQ的作用是削峰填谷、解耦、多次消费。

    组件选择

    选择组件,我们这边主要是从以下几个方面进行考量的:

    1. 组件对应的开源生态完整、活跃度高
    2. 运维成本
    3. 易部署、性能好

    Agent: 主要比较下相对熟悉的Logstash和filebeat。

    指标 Logstash filebeat Log-pilot
    内存
    背压敏感协议
    插件
    功能 从多种输入端采集并实时解析和转换数据并输出到多种输出端 传输 支持收集容器的标准输出日志和文件
    轻重
    过滤功能 强大的过滤功能 有过滤功能但较弱 Log-pilot支持不同的插件收集日志( Filebeat PluginFluentd Plugin
    集群 单节点 单节点 单节点
    输出到多个接收方 支持 支持 支持
    原理 Logstash使用管道的方式进行日志的搜集和输出,分为输入input-->处理filter(不是必须的)-->输出output,每个阶段都有不同的替代方式 开启进程后会启动一个或者多个探测器(prospectors)去检测指定的日志目录或者文档建,对于探测器找出的每个日志文件,filebeat启动收割进程(harvester),每个收割进程读取一个日志文件的新内容,并发送这些新的日志数据到处理程序(spooler),处理程序会集合这些事件,最后filebeat会发送集合的数据到你指定的地点

    一个好的agent应该是资源占用少,性能好,不依赖别的组件,可以独立部署。而Logstash明显不符合这几点要求,也许正是基于这些考虑elastic推出了Filebeat。目前log-pilot文档相对较少,如果要对filebeat做特定的配置,需要重新对log-pilot镜像配置完以后重打镜像(譬如异常日志多行合并),且目前log-pilot阿里目前已经不再维护了,社区活跃度不够。综上所述,还是采用原生的Filebeat进行日志收集。

    Collector暂时不用,直接来看看MQ
    MQ:
    filebeat的输出管道类型包括:
    Elasticsearch ServiceElasticsearchLogstashKafkaRedisFileConsole目前的方案采用的是kafka作削峰填谷、解耦。
    ETL:采用Logstash
    Storage and Search:采用ElasticSearch、Kibana
    到这里,基本可以看出我们的架构如下:


    具体实现
    Agent日志收集方案:
    k8s日志收集filebeat日志收集方案:

编号 方案 优点 缺点
1 每个app的镜像中都集成日志收集组件 部署方便,kubernetes的yaml文件无须特别配置,可以为每个app自定义日志收集配置 强耦合,不方便应用和日志收集组件升级和维护且会导致镜像过大
2 单独创建一个日志收集组件跟app的容器一起运行在同一个pod中 低耦合,扩展性强,方便维护和升级 需要对kubernetes的yaml文件进行单独配置,略显繁琐
3 将所有的Pod的日志都挂载到宿主机上,每台主机上单独起一个日志收集Pod 完全解耦,性能最高,管理起来最方便 需要统一日志收集规则,目录和输出方式
4 将所有的Pod的日志挂载到nfs-server上,在nfs-server上单独启一个日志收集Pod 是方案3的升级版 需要统一日志收集规则,目录和输出方式,如果日志收集pod失效,会导致整个日志收集停止,影响范围比方案3大

综合以上优缺点,我们选择使用方案三。

统一日志格式:

日志类型 描述 日志包含的因子 日志格式 备注
biz日志 用于业务中打印的info/warn 全局链路线程号、消费者IP、提供者IP、调用者应用名称、消费者应用名称、客户号、接口名称、具体日志输出的所在行、日志打印的内容 仅在代码中logger.info/warn输出才会打印
err 日志 用于业务输出的error 全局链路线程号、消费者IP、提供者IP、调用者应用名称、消费者应用名称、客户号、接口名称、具体日志输出的所在行、日志打印的堆栈信息 仅在代码中logger.error输出才会打印
io 日志 用于接口的输入输出参数 全局链路线程号、消费者IP、提供者IP、调用者应用名称、消费者应用名称、客户号、接口名称、具体日志输出的所在行、输入输出的参数、接口耗时 仅配置了注解,出入参才不会打印
monitor日志 用于监控接口耗时、接口数据量 全局链路线程号、消费者IP、提供者IP、调用者应用名称、消费者应用名称、客户号、接口名称、具体日志输出的所在行、数据量、接口耗时、结果标识(成功 or 失败 ) 默认全部打印

需要解决的问题:

1、日志历史归档压缩(未做)

2、日志敏感数据脱敏(未做)

3、异步日志打印

目前已对日志格式做了调整,增加了异步日志,且将日志文件放到nacos上,可动态配置。

日志打印时间 | 线程号 | traceId | [调用方IP | 调用方 | 提供方IP | 提供方] | [请求url] | 日志打印类 [日志打印行号] - 日志内容

Filebeat配置:
vim /opt/filebeat-6.8.9-linux-x86_64/filebeat-all.yml
文件内容如下:

fields:
  ip_inside: 10.210.100.141
fields_under_root: true
filebeat.config.inputs:
  enabled: true
  path: /opt/filebeat-6.8.9-linux-x86_64/conf/*/*.yml
  reload.enabled: true
  reload.period: 10s
logging.files:
  keepfiles: 3
  name: filebeat
  path: /opt/filebeat-6.8.9-linux-x86_64/log/ 
logging.level: info
logging.to_files: true
output.kafka:
  bluk_max_size: 4096
  compression: gzip
  compression_level: 7
  hosts:
    - 10.210.98.112:9092
  partition.round_robin:
    reachable_only: true 
  required_acks: 1
  timeout: 45
  topic: 'bi-sit'
  worker: 1
processors:
 - drop_fields:
     fields: ["prospector","beat","host","source","log"]
queue.mem:
  events: 4096
  flush.min_events: 512
  flush.timeout: 5s

参数说明:

序号 参数 说明
1 fields.ip_inside 存储日志的宿主机IP
2 filebeat.config.inputs.path 宿主机日志采集配置文件,统一在/opt/filebeat-6.8.9-linux-x86_64/conf文件夹下
3 output.kafka.topic 日志在Kafka中使用的Topic
4 output.kafka.hosts Kafka宿主机IP和Port,检测当前日志宿主机与Kafka宿主机是否可正常通信:telnet 10.210.98.112 9092

日志采集配置:
mkdir /opt/filebeat-6.8.9-linux-x86_64/conf
新建各个级别日志采集配置文件
以bi系统测试环境为例:新建文件夹:bi-sit
mkdir /opt/filebeat-6.8.9-linux-x86_64/conf/bi-sit
新建info日志采集配置文件bqx-bi-info.yml,文件内容如下:

- enabled: true
  exclude_files:
  - .gz$
  fields:
    log_type: err
    server_name: bqx-bi
    tags : bi-sit
  fields_under_root: true
  multiline.match: after
  multiline.negate: true
  multiline.pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} 
  paths:
  -  /u01/bqx-bi/logs/info/bqx-bi-service-info*.log 
  type: log

参数说明:

序号 参数 说明
1 fields.server_name 服务名称
2 fields.tags 服务标签
3 paths 对应级别日志所在的路径

Logstash配置:

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  kafka{
        bootstrap_servers => ["10.210.3.68:9092"]
        client_id => "logger"
    group_id => "logger"
    auto_offset_reset => "latest"
    consumer_threads => 5
    decorate_events => true
    topics => ["logger"]
        codec => json {
                         charset => "UTF-8" }
  }
 
  kafka{
        bootstrap_servers => ["10.210.3.68:9092"]
        client_id => "activity"
        group_id => "activity"
        auto_offset_reset => "latest"
        consumer_threads => 1 
        decorate_events => true
        topics => ["activity"]
        codec => json {
                         charset => "UTF-8" }
  }

  kafka{
        bootstrap_servers => ["10.210.3.68:9092"]
        client_id => "mysqlslowlog"
        group_id => "mysqlslowlog"
        auto_offset_reset => "latest"
        consumer_threads => 1 
        decorate_events => true
        topics => ["mysqlslowlog"]
        codec => json {
                         charset => "UTF-8" }
  }
  
   kafka{
        bootstrap_servers => ["10.210.3.68:9092"]
        client_id => "bi-prd"
        group_id => "bi-prd"
        auto_offset_reset => "latest"
        consumer_threads => 2 
        decorate_events => true
        topics => ["bi-prd"]
        codec => json {
                         charset => "UTF-8" }
  }


}


filter {
        grok{
           match => ["message","(?<logdate>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME})",
                     "message","# Time: (?<time>[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})"
           ]
           }
        date{
           match => [ "logdate", "yyyy-MM-dd HH:mm:ss,SSS"]
           locale => "en"
           timezone => "Asia/Shanghai"
        }
       mutate{
           remove_field => ["logdate","@metadata"]
       }
}

output {
  elasticsearch {
    hosts => ["http://10.210.3.68:9200"]
    index => "%{[tags]}-log-%{+YYYY.MM.dd}"
    user => "es用户名"
    password => "es密码"
    #ssl => true
    #ssl_certificate_verification => false
  }
}

output中user和password,如果有用户名和密码的话,需要设置,没有可以去掉

针对过期的index需要通过定时任务将对应的index删除掉
脚本如下:

#!/bin/bash

#删除早于15天的ES集群的索引
function delete_indices() {
param=$(echo $1)
#截取索引的日期部分(用于下面的日期比较是否小于15日),我的索引是com-字符串后面的部分为日期, 比如: www.test.com-2020.08.08
dateValue=$(echo ${param#*com-})
#截取日期的前部分作为索引的名称(后续需要替换-为., 然后和日期拼接起来成为一个真正的索引名称,用于删除)
name=$(echo  $2)

echo "name=$name date=$dateValue" 
comp_date=`date -d "15 day ago" +"%Y-%m-%d"`
date1="$dateValue 00:00:00"
date2="$comp_date 00:00:00"

t1=`date -d "$date1" +%s` 
t2=`date -d "$date2" +%s`

if [ $t1 -le $t2 ]; then
echo "$1时间早于$comp_date,进行索引删除"
#转换一下格式,将类似www-test-com格式转化为www.test.com

#转换一下格式,将类似2020-10-01格式转化为2020.10.01
format_date=`echo $dateValue| sed 's/-/\./g'`
#拼接成索引名称
indexName="$name-$format_date"

curl -u admin:admin -k -XDELETE http://10.210.3.68:9200/$indexName
#删除索引
echo "$indexName删除成功"
fi
}

curl -u admin:admin -k http://10.210.3.68:9200/_cat/indices?v | awk -F" " '{print $3}' |egrep prod|awk -F"-" '{print $NF}' | egrep "[0-9]*\.[0-9]*\.[0-
9]*" | sort | uniq  | sed 's/\./-/g'| while read LINE
do
#调用索引删除函数, 结果打印到日志
delete_indices $LINE prod-log >> /home/logs/delete_indices.log
done

curl -u admin:admin -k http://10.210.3.68:9200/_cat/indices?v | awk -F" " '{print $3}' |egrep activity|awk -F"-" '{print $NF}' | egrep "[0-9]*\.[0-9]*\
.[0-9]*" | sort | uniq  | sed 's/\./-/g'| while read LINE
do
#调用索引删除函数, 结果打印到日志
delete_indices $LINE activity-log >> /home/logs/delete_indices.log
done

匹配逻辑需要根据你自己的索引规范来做适当调整

举报

相关推荐

0 条评论