消息中间件之Kafka消费者
- 一、`Consumer`初始化配置
- 二、测试简单的消费消息
- 三、手动提交
- 四、`Consumer Group`
- 六、单个`Partition`提交
- 七、`Consumer`多线程并发处理
- 八、`Consumer`设置`offset`
- 九、`Consumer`限流操作
一、Consumer初始化配置
/**
* producer 配置
* @return
*/
@Bean
public KafkaProducer<String, String> kafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.26:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(properties);
}
二、测试简单的消费消息
这里为了不需要等待,我们异步的开启一个线程去消费消息。
/**
* @author long
*/
@Slf4j
@RestController
@RequestMapping("/consumer")
public class ConsumerController {
@Autowired
private KafkaConsumer kafkaConsumer;
/**
* 简单消费者
*/
@GetMapping("/simple")
public String simple(@RequestParam("topic_name") String topicName) {
kafkaConsumer.subscribe(Arrays.asList(topicName));
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
log.info("消费信息:patition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
}
}
}, "获取消息线程 => ").start();
return "success";
}
}
启动项目之后,先访问localhost:8080/consumer/simple?topic_name=new_long_topic_34这个接口,让这个异步的线程启动起来。
这时运行发送消息的接口:localhost:8080/producer/send2?topicName=new_long_topic_34&num=100,发送一百条消息。

我们上面的这种消费方法,使用的是自动提交的方法。实际中使用不推荐使用。
三、手动提交
/**
* 手动提交
* @param topicName
* @return
*/
@GetMapping("/commit")
public String commit(@RequestParam("topic_name") String topicName) {
kafkaConsumer.subscribe(Arrays.asList(topicName));
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
// TODO 处理数据保存到数据库
log.info("消费信息入库:patition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
// TODO 处理失败的话,则回滚,不要提交offset
}
// TODO 成功的话,手动提交offset
kafkaConsumer.commitAsync();
}
}, "获取消息线程 => ").start();
return "success";
}
四、Consumer Group

这里面我们需要注意:
- 单个分区的消息只能由ConsumerGroup中某个Consumer消费
- ConsumerGroup中的一个Consumer可以对应多个分片
- 一个分片不可以被ConsumerGroup中的多个Consumer消费,多出的只能闲置
- Consumer从Partition中消费消息是顺序,默认从开头开始消费
- 单个ConsumerGroup会消费所有的Partition中的消息。
六、单个Partition提交
可以控制单个partition手动提交,方便使用多线程进行消息消费;并且可以对多个partition提交进行控制。
@GetMapping("/single")
public String singlePartition(@RequestParam("topic_name") String topicName) {
  kafkaConsumer.subscribe(Arrays.asList(topicName));
  new Thread(() -> {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
    for (TopicPartition partition : records.partitions()) {
      List<ConsumerRecord<String, String>> recordList = records.records(partition);
      for (ConsumerRecord<String, String> record : recordList) {
        log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
      }
      long offset = recordList.get(recordList.size() - 1).offset();
      Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
      offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
      kafkaConsumer.commitSync(offsetMap);
    }
  }, "获取消息线程<single> => ").start();
  return "success";
}针对上面的代码,可以发现对于只包含一个partition的时候,上面的写法是有点啰嗦的,我们还有另一种写法:
@GetMapping("/single_2")
public String singlePartition2(@RequestParam("topic_name") String topicName) {
  TopicPartition partition = new TopicPartition(topicName, 0);
  // 订阅topic中的某一个partition
  kafkaConsumer.assign(Arrays.asList(partition));
  new Thread(() -> {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
    List<ConsumerRecord<String, String>> recordList = records.records(partition);
    for (ConsumerRecord<String, String> record : recordList) {
      log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
    }
    long offset = recordList.get(recordList.size() - 1).offset();
    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
    offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
    kafkaConsumer.commitSync(offsetMap);
  }, "获取消息线程<single> => ").start();
  return "success";
}七、Consumer多线程并发处理
关于多线程并发处理,常用的是这样两种线程模型:

第一种是:对于数据进行异步处理,适用于对于数据一致性要求不高,不是用于处理业务,这种情况就是为了快速消费,不管是不是成功,偏向于日志这种的;
第二种是:一个partition对应一个consumer,多用户处理业务使用。
在上面使用异步线程也是一种多线程使用方式,另一种大家也能猜到肯定是使用线程池(推荐)。
@Bean("longThreadPool")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
  ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
  // 核心线程数
  poolTaskExecutor.setCorePoolSize(10);
  // 最大线程数
  poolTaskExecutor.setMaxPoolSize(15);
  // 缓冲对列
  poolTaskExecutor.setQueueCapacity(100);
  // 允许线程空闲时间60s
  poolTaskExecutor.setKeepAliveSeconds(60);
  // 线程池前缀
  poolTaskExecutor.setThreadNamePrefix("consumer-pool");
  // 线程池对拒绝任务的处理策略
  poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  // 关闭线程池的时候,是否等待当前任务执行完成
  poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  // 等待当前任务完成的超时时间60秒,否则会造成阻塞
  poolTaskExecutor.setAwaitTerminationSeconds(60);
  poolTaskExecutor.initialize();
  return poolTaskExecutor;
}使用直接注入就可以:
@Autowired
private ThreadPoolTaskExecutor longThreadPool;
可以使用@Async的进行处理,不做演示了,下面演示下对第一种模型的处理:
处理consumer的消费消息
@GetMapping("/simple")
public String simple(@RequestParam("topic_name") String topicName) {
  kafkaConsumer.subscribe(Arrays.asList(topicName));
  new Thread(() -> {
    while (true) {
      ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
      for (ConsumerRecord<String, String> record : records) {
        // 对提交的数据进行处理
        longThreadPool.execute(() -> {
          log.info("消费信息:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
        });
      }
    }
  }, "获取消息线程 => ").start();
  return "success";
}八、Consumer设置offset
consumer提供了一个seek的函数,可以设置我们开始的offset位置开始消费。设置offset:多用于回滚和重复消费。
我们本地的消费的最后offset是399,测试的我们从350开始。

@GetMapping("/single_2")
public String singlePartition2(@RequestParam("topic_name") String topicName) {
  TopicPartition partition = new TopicPartition(topicName, 0);
  // 订阅topic中的某一个partition
  kafkaConsumer.assign(Arrays.asList(partition));
  new Thread(() -> {
    kafkaConsumer.seek(partition, 350);
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
    List<ConsumerRecord<String, String>> recordList = records.records(partition);
    for (ConsumerRecord<String, String> record : recordList) {
      log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
    }
    long offset = recordList.get(recordList.size() - 1).offset();
    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
    offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
    kafkaConsumer.commitSync(offsetMap);
  }, "获取消息线程<single> => ").start();
  return "success";
}
从这里我们也可以知道,kafka是不会丢弃消息的。
九、Consumer限流操作
一般情况,系统不会给kafka客户端,提供太多的资源,有时候会出现数据峰值,把kafka打死,所以这个时候限流就很重要了。
一般来说:当处理的数据量达到某个阈值时暂停消费,低于阈值时则恢复消费,这就可以让Consumer保持一定的速率去消费数据,从而避免流量剧增时将Consumer给压垮。
这里我们利用Guava的限流器对Consumer进行限流。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
代码实现:
@GetMapping("/rate_limiter")
public String rateLimiter(@RequestParam("topic_name") String topicName) {
  TopicPartition p0 = new TopicPartition(topicName, 0);
  TopicPartition p1 = new TopicPartition(topicName, 1);
  kafkaConsumer.assign(Arrays.asList(p0, p1));
  new Thread(() -> {
    while (true) {
      ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10));
      if (records.isEmpty()) {
        continue;
      }
      if (!rateLimiter.tryAcquire()) {
        log.warn("无法拿到令牌,开始限流...");
        kafkaConsumer.pause(Arrays.asList(p0, p1));
      } else {
        log.info("拿到令牌,开始消费...");
        kafkaConsumer.resume(Arrays.asList(p0, p1));
      }
      for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, String>> recordList = records.records(partition);
        for (ConsumerRecord<String, String> record : recordList) {
          log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
        }
        long offset = recordList.get(recordList.size() - 1).offset();
        Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
        offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
        kafkaConsumer.commitSync(offsetMap);
      }
    }
  }, "限流器线程 => ").start();
  return "success";
}                










