0
点赞
收藏
分享

微信扫一扫

SpringBoot 简单使用 Kafka 消息队列功能

花明 2021-09-21 阅读 107

一、Docker 安装 Kafka 单机版

1、下载镜像

docker pull wurstmeister/zookeeper:3.4.6
docker pull wurstmeister/kafka:2.12-2.3.0

2、启动 zookeeper 容器

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

3、启动 kafka 容器

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.9:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.9:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

这里主要设置了4个参数
KAFKA_BROKER_ID=0
KAFKA_ZOOKEEPER_CONNECT=192.168.1.9:2181
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.9:9092
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
中间两个参数的192.168.1.9改为宿主机器的 IP 地址,如果不这么设置,可能会导致在别的机器上访问不到 kafka 。

4、测试 kafka

启动消息发送者

#进入 kafka 容器的命令行
docker exec -it kafka /bin/bash

#进入kafka所在目录
cd opt/kafka_2.12-2.3.0/

启动消息发送者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest

再新建一个主机会话,用来启动消费者

#进入 kafka 容器的命令行
docker exec -it kafka /bin/bash

#进入kafka所在目录
cd opt/kafka_2.12-2.3.0/

启动消息消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkaTest --from-beginning

在消息发送发送者命令行输入123456,然后再在消息接收者查看,看到123456 消息则代表 kafka 单机版搭建成功。

二、SpringBoot 项目中简单使用

1、引入 maven 依赖,版本参考官方

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

2、配置 yml 文件

spring:
kafka:
# kafka服务器地址(可以多个)
bootstrap-servers: 192.168.1.9:9092
consumer:
# 指定一个默认的组名
group-id: kafkaGroup
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取字节数
batch-size: 65536
# 缓存容量
buffer-memory: 524288
# 服务器地址
bootstrap-servers: 192.168.1.9:9092

3、写一个简单的 Controller ,里面包含发送消息和接收消息

@EnableAutoConfiguration
@RestController
@Slf4j
public class Controller {

/**
* 注入kafkaTemplate
*/

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

private static final String TOPIC = "testTopic";

@RequestMapping("/kafkaSend")
public String testKafkaSend() {
for (int i = 1; i < 6; i++) {
kafkaTemplate.send(TOPIC,"key" + i, "data" + Math.random());
}
return "success";
}


/**
* 消费者监听消息
*/

@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<?, ?> consumer) {
log.info("{} - {}:{}", consumer.topic(), consumer.key(), consumer.value());
}

}

4、进行测试,发送请求http://localhost:8080/kafkaSend,可看到控制台打印如下:

举报

相关推荐

0 条评论