Kafka简介
Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有以下不同:
- 它被设计为一个分布式系统,易于向外扩展;
- 它同时为发布和订阅提供高吞吐量;
- 它支持多订阅者,当失败时能自动平衡消费者;
- 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
SpringBoot集成Kafka
- 修改config/server.properties文件,在很靠前的位置有listeners和 advertised.listeners两处配置的注释,去掉这两个注释,并且根据当前服务器的IP修改如下:
listeners=PLAINTEXT://ip:9092  #当前服务器的IP
advertised.listeners=PLAINTEXT://ip:9092 #当前服务器的IP
- pom文件
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
- application.yml
spring:
  kafka:
    bootstrap-servers: 192.168.0.197:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- KafkaProducerController
@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @RequestMapping("send")
    public String send(String msg){
        logger.info("生产者生产的消息:"+msg);
        kafkaTemplate.send("test_topic", msg);
        return "success";
    }
}
- TestConsumer
@Component
public class TestConsumer {
    @KafkaListener(topics = "test_topic")
    public void listen (ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}












