0
点赞
收藏
分享

微信扫一扫

ELK日志收集之kafka 方案Filebeat + kafka + Logstash + ES + Kibana

罗子僧 2024-12-02 阅读 8

一.简介

常见的日志采集处理解决方案

Filebeat + ES + Kibana
Filebeat + Logstash + ES + Kibana
Filebeat + Kafka/Redis/File/Console + 应用程序(处理/存储/展示)
Filebeat + Logstash+Kafka/Redis/File/Console + 应用程序(处理/存储/展示)

ELK日志收集之kafka 方案Filebeat + kafka + Logstash + ES + Kibana_kafka

二.配置

1.创建Filebeat配置文件 采集nginx/tomcat日志

#142服务器filebeat配置
cd /es/softwares/filebeat-7.17.5-linux-x86_64/config 
cat >01-nginx-tomcat-to-kafka.yaml<<'EOF'
filebeat.inputs:
#nginx日志文件采集
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log*
    - /var/log/nginx/error.log*
#多行选项配置
  multiline.type: pattern
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
  multiline.negate: true
  multiline.match: after
#tomcat日志文件采集
- type: log
  enabled: true
  paths:
    - /app/tools/tomcat/logs/localhost_access_log.*.txt
    - /app/tools/tomcat/logs/catalina*
#多行选项配置
  multiline.type: pattern
  multiline.pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'
  multiline.negate: true
  multiline.match: after
#将数据输出到kafka
output.kafka:
  hosts: ["192.168.77.176:9092", "192.168.77.177:9092", "192.168.77.178:9092"]
  topic: "nginx_tomcat_logs_topic"
EOF

启动Filebeat时,确保指定正确的配置文件:

filebeat -e -c 01-nginx-tomcat-to-kafka.yaml

2.kafka查看消费数据

#176服务器查看kafka消费数据
kafka-console-consumer.sh --bootstrap-server 192.168.77.176:9092 --topic nginx_tomcat_logs_topic --from-beginning

ELK日志收集之kafka 方案Filebeat + kafka + Logstash + ES + Kibana_kafka_02

3.logstash从kafka拉取数据写入es配置

logstash指定输入类型为kafka

#176kafka配置
cat >01-kafka-to-es.conf<<'EOF'
input {
  kafka {
    #指定kafka集群地址
    bootstrap_servers => "192.168.77.176:9092,192.168.77.177:9092,192.168.77.178:9092"
    #指定消费的topic
    topics => ["nginx_tomcat_logs_topic"]
    #指定消费者组
    group_id => "nginx_tomcat_logs_topic_group01"
    #指定消费的偏移量,"earliest"表示从头读取数据,"latest"表示从最新的位置读取数据
    auto_offset_reset => "earliest"
  }
}

filter {
  json {
    # 对指定字段进行json格式解析
    source => "message"
  }
    
  mutate {
     remove_field => [ "agent","log","input","host","ecs","tags" ]
  }
}

output { 
  #将数据写入ES集群
  elasticsearch {
    #指定ES主机地址
    hosts => ["http://192.168.77.176:9200","http://192.168.77.177:9200","http://192.168.77.178:9200"]
    #指定索引名称
    index => "nginx_tomcat_logs_topic-%{+YYYY.MM.dd}"
    #配置es用户名和密码
    user => "elastic"
    password => "SfSnnfYPzBTMMTyUbuRa"
  }
}

EOF
#启动logsash
logstash -f 01-kafka-to-es.conf

4.elasticsearch数据查看

ELK日志收集之kafka 方案Filebeat + kafka + Logstash + ES + Kibana_elk_03

5.kabana查看

创建索引模式

ELK日志收集之kafka 方案Filebeat + kafka + Logstash + ES + Kibana_kafka_04

dicover查看数据

ELK日志收集之kafka 方案Filebeat + kafka + Logstash + ES + Kibana_elk_05








举报

相关推荐

0 条评论