参考文章
【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客
Docker搭建kafka+zookeeper
打开我们的docker的镜像源配置
配置
下面的那个insecure是我自己虚拟机的,不用理会
拉取镜像
然后开始拉取我们的zookeeper镜像和我们的kafka镜像
这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本
docker pull zookeeper
kafka镜像
docker pull wurstmeister/kafka
因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络
让不同容器都加入这个网络
docker network create创建我们的网络
然后那个zookeeper_network是我们自定义的网络名称
docker network create --driver bridge zookeeper_network
kafka是依赖于zookeeper的所以我们要先安装zookeeper
我们先用run来创建一个zookeeper容器
docker run -d --name zookeeper1 --network zookeeper_network -p 2181:2181 zookeeper
查看我们创建的网络环境的地址
docker inspect zookeeper_network
那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址
我的是12.21.0.2,这个ip地址是要记住方便后面使用的
创建一个kafka容器
这段代码有点长,根子自己改吧
# 启动kafka
docker run -d --name kafka1 --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
解释
不知道本机地址可以输入 ip addr来查看本机地址
这样子就搭建完成了
SpringBoot集成kafka
首先就是springboot和kafka的版本兼容了
Spring for Apache Kafka
然后我们引入两个kafka的依赖
自己对着自己的版本来
自己看b站视频,9分钟就搞定了
63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili
然后开始写我们的application.yml配置文件
下面是配置文件的全部+解析
其实和普通mq差不多
也就是配置生产者和消费者和一些过期时间超时时间
重点在于那个missing-topics-fatal
主题不存在的话,我们是否还要成功启动
我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题
所以我启动的时候,报错然后失败了
基本案例
这是常量类
指定了一个topic和group
主题和分组id
groupid是消费者组的唯一标识
这个视频9分钟看懂kafka
小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili
生产者
我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
produces里面指定我们前端传的是json格式
我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"
消费者
@KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)
public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
for (String message : messages) {
//因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象
final JSONObject entries = JSONUtil.parseObj(message);
System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));
//ack.acknowledge();
}
}
我们用List<String>来接收,因为可能一个消费者接收多条消息
指定消费者监听的主题topic
以及指定消费者的唯一标识GROUP_ID
这些其实都是自己在常量类里面自己写好的
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
这个是得到我们的主题topic的名字
我用apifox调试之后,成功执行了
kafka的图形化工具
这里介绍一个免费的开源项目KafkaKing
Releases · Bronya0/Kafka-King (github.com)
里面还能指定中文