0
点赞
收藏
分享

微信扫一扫

ELK+Kafka(高并发网站日志收集分析平台)

kafka:高吞吐量的分布式发布订阅消息

kafka的特性:

1.通过磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

2.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。

3.支持通过Kafka服务器和消费机集群来分区消息。

Kafka与ELK业务层可以直接写入到kafka队列中,不用担心elasticsearch的写入效率问题。

ELK+Kafka(高并发网站日志收集分析平台)_kafka

实验部署:

架构图

ELK+Kafka(高并发网站日志收集分析平台)_kafka_02

nginx-proxy(略)
192.168.0.110 es1
192.168.0.1411 es2
kafka1 192.168.0.112
kafka2 192.168.0.113
kafka3 192.168.0.115
webserver 192.168.0.114

部署步骤:

1.ES集群安装配置

1.安装java-1.8.0以及依赖包(每台服务器都安装JAVA)
yum install -y epel-release
yum install -y java-1.8.0 git wget lrzsz
缓存该包
2.获取es软件包
wget https://download.elastic,co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz
tar -zxvf elasticsearch-1.7.3.tar.gz -C /usr/local
ln -sv /usr/local/elasticsearch-1.7.3/usr/loc al/elasticsearch
3.修改配置文件
vim /usr/localelasticsearch/config/elasticsearch.yml
32 cluster.name: es-cluster #组播的名称地址
40 node.name: "es-node1" #节点名称,不能和其他节点重复
47 node.master: true #节点能否被选举为master
51 node.data: true #节点是否存储数据
107 index.number_of_shards: 5 #索引分片的个数
111 index.number_of_replicas: 1 #分片的副本个数
145 path.conf: / usr/local/elasticsearch/config #配置文件的路径
149 path.data:/data/es/data #数据目录路径
159 path.work: /data/es/worker #工作目录路径
163 path.logs: /usr/local/elasticsearch/logs #日志文件路径
167 path.plugins: /data/es/plugins #插件路径
184 bootstrap.mlockall: true #内存不向swap交换
232 http.enabled: true #启用http
4.创建相关文件
mkdir /data/es/{data,worker,plugins] -p
5.获取es服务管理脚本
git clone https://github.com/elastic/elasticsearch-servicewrapper.git
mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/
/usr/local/elasticsearch/bin/service/elasticsearch install
6.启动es,并检查其服务是否正常
netstat -nlpt | grep -E 9200|9300
测试∶浏览器访问http://192.168.0.110:9200/
注意:在es1装插件
[ root @ es1 local ] # /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

2.Logstash客户端安装配置

web-server1
说明∶采集日志
1.downloads软件包
yum install -y java-1.8.0 注意,Logstash是需要依赖java环境的,所以这里还是需要yum install -y java-1.8.0
wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz
tar -xf logstash-2.0.0.tar.gz -C /usr/local
cd /usr/local/
[ root@webserver1 local ] # ln -sv logstash-2.0.0 logstash
[ root@webserver1 local ] # mkdir /usr/local/logstash/logs,etc}
2.Logstash向es集群写数据(测试)
#编写配置文件
vim /usr/local/logstash/etc/logstash.conf
input {
#数据的输入从标准输入
stdin { }
output { #数据的输出我们指向了es集群
elasticsearch {
hosts => [ "192.168.0.110:9200","192.168.0.111:9200" ] #es主机的ip及端口
}
}
#检查配置文件
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash.conf --configtest --verbose
#启动logstash
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc /logstash.conf
3.下面演示一下如何收集系统日志(测试)(略)
#编写配置文件
vim /usr/local/logstash/etc/logstash2.conf
input {
file {
path => "/var/log/messages” #这是日志文件的绝对路径
start_position => "beginning" #这个表示从messages 的第一行读取,即文件开始处
}
}
output {
elasticsearch {
hosts => [ "192.168.0.110:9200", "192.168.0.111:9200"]
index => "system-messages-%[+YYYY-MM}” #这里将按照这个索引格式来创建索引
}
}
#启动logstash
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc /logstash2.conf

3.Kafka集群安装配置

提示:安装kafka集群时,需要提前安装zoqkeeper集群,当然kafka已经自带zookeeper程序只需要解压并且安装配置就行了
1.获取软件包
提示:官网: http://kafka.apache.org
yum install -y java-1.8.0
wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/
cd /usr/local
ln -sv kafka_2.11-0.8.2.1 kafka
2.配置zookeeper集群
[ root @ kafka1 ~ ] # vim/usr/loc al/kafka/config/zookeeper.properties
dataDir=/data/zookeeperclientPort=2181
tickTime=2000 #tickTime :这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳。
initLimit=20
#initLimit : LF初始通信时限
集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量
此配置表示,允许 follower (相对于leader而言的“客户端”)连接并同步到 leader 的初始化连接时间,超过ticktime的倍数则失败
syncLimit=10
#initLimit : LF同步通信时限
集群中的follower服务器(F)与leader服务器(L)之间请求和应答时能容忍的最多心跳数(tickTime的数量
此配置表示,允许 follower (相对于leader而言的“客户端”)连接并同步到 leader 的初始化连接时间,超过ticktime的倍数则失败
server.2=192.168.0.112:2888:3888
server.3=192.168.0.113:2888:3888
server.4=192.168.0.115:2888:3888
maxClientCnxns=0 #0为随便连接
3.创建zookeeper所需要的目录
[ root @ kafka1 ~ ]# mkdir /data/zookeeper
4.创建myid文件
提示:myid里面的内容为数字,用于标识主机,如果这个文件没有的话,zookeeper是没法启动
[ root @ kafka1 ~ ] # echo 2 > /data/zookeeper/ myid #另外两台,分别是3,4
5.kafka配置
[ root @ kafka1 ~ ] # vim /usr/local/kafka/config/server.properties
broker.id=2 #每台不一样
prot=9092
host.name=192.168.0.112 #唯一,填服务器IP(自身)
log.dir=/data/kafka-logs
zookeeper.connect=192.168.0.112:2181,192.168.0.113:2181,192.168.0.115:2181
num.partitions=16
log.dirs=/data/kafka-logs
log.retention.hours =168o
6.启动,先启动zookeeper集群,再启动kafka
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
/usr/local/kafka/bin/zookeeper-server-stop.sh #zookeeper停止的命令
7.检查zookeeper集群
[ root @ kafka1 ~ ] # netstat -nlpt | grep -E "2181|2888|3888"
8.启动kafka
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
/usr/local/kafka/bin/kafka-server-stop.sh #kafka停止的命令
下面我们将webserver1上面的logstash的辑出改到kafka上面(测试)
(1修改webserver1上面的logstash配置
[root @ webserver1~] # vim /usr/local/logstash/etc/logstash3.conf
input {
file {
type = > "system-message"
path => "/var/log/messages"
start_position => "beginning"}
}
}
output{
kafka {
bootstrap_servers =>
"192.168.0.112:9092,192.168.0.113:9092,192.168.0.115:9092-
topic_id => "system-messages”#这个将作为主题的名称,将会自动创建
compression_type => "snappy”#压缩类型
(2)配置检测
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash3.conf --configtest --verbose
(3)启动Logstash
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash3.conf
(4)验证数据是否写入到kafka,检查是否生成一个system-messages的主题
[ root@kafka1 ~ ]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.0.113:2181
输出信息
summer
system - messages#可以看到这个主题已经生成了
(5)查看system-messages主题的详情
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.0.113:2181--topic system-messages
9.kafka集群部署lohstash
kafka1&2&3安装logstash
wget https://download.elastic.co/logstash/ logstash/logstash-2.0.0.tar.gz
tar -xf logstash-2.0.0.tar.gz -C /usr/local
cd /usr/loc al/
ln -sv logstash-2.0.0 logstash
mkdir /usr/local/logstash/{logs,etc}
三台kafak编写logstash配置文件
vim /usr/local/logstash/etc/logstash.conf
input {kafka {
zk_connect => "192.168.0.112:2181,192.168.0.113:2181,192.168.0.115:2181” #消费者们
topic_id = > "system-messages"
codec => plain
reset_beginning = > false
consumer_threads => 5
decorate_events = > true
}
}
output {
elasticsearch {
hosts =>["192.168.0.110:9200","192.168.0.111:9200"]
index => "test-system-messages-%{+YYY-MM)”#区分之前实验,新名字“test-system-messages-%{(+YYY-MM}"
}
webserver1上写入测试内容
>/var/log/messages #清空
echo "我将通过kafka集群达到es集群1234" >> /var/log/messages
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash3.conf
三台kafka启动logstash (注意顺序1>2>3 )
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash.conf
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash.conf
/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash.conf


举报

相关推荐

0 条评论