1、官网下载最新版本的kafka,里面已经集成zookeeper。直接解压到D盘
2、配置文件修改,config目录下面的zookeeper.properties. 设置zookeeper数据目录
dataDir=D:/kafka_2.12-3.6.0/tmp/zookeeper

3、修改kafka的配置文件server.properties. 主要修改内容如下:
zookeeper.connect=localhost:2181
log.dirs=D:\\kafka_2.12-3.6.0\\logs
listeners=PLAINTEXT://localhost:9092
其他默认即可。
4、修改完成后进入bin目录:启动zookeeper和kafka,命令如下
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties
5、命令行创建topic,命令如下:
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic hello
6、创建生产者和消费者,测试。生产者输入消息,消费者就会收到相应的消息了
kafka-console-producer.bat --broker-list localhost:9092 --topic hello

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic hello--from-beginning

7、创建springboot工程,测试
引入依赖:
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>
8、yml文件配置kafka
spring:
 kafka:
 bootstrap-servers: localhost:9092
 producer:
 acks: 1
 retries: 3
 batch-size: 16384
 properties:
 linger:
 ms: 0
 buffer-memory: 33554432
 key-serializer: org.apache.kafka.common.serialization.StringSerializer
 value-serializer: org.apache.kafka.common.serialization.StringSerializer
 consumer:
 group-id: helloGroup
 enable-auto-commit: false
 auto-commit-interval: 1000
 auto-offset-reset: latest
 properties:
 request:
 timeout:
 ms: 18000
 session:
 timeout:
 ms: 12000
 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
9、使用springboot KafkaTemplate发送消息
@RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
 public String sendMessage(String message) {
 kafkaTemplate.send("hello", message);
 return "发送成功~";
 }
10、消息消费,
 @KafkaListener(topics = "hello")
 public void receiveMessage(ConsumerRecord<String, String> record) {
 String topic = record.topic();
 long offset = record.offset();
 int partition = record.partition();
 String message = record.value();
 System.out.println("topic = " + topic);
 System.out.println("offset = " + offset);
 System.out.println("partition = " + partition);
 System.out.println("message = " + message);
 }










