原代码
public class KafkaConsumerDemo {
public static void main(String[] args) {
int numConsumers = 5; // 增加消费者的数量
for (int i = 0; i < numConsumers; i++) {
new Thread(new KafkaConsumerThread()).start();
}
}
static class KafkaConsumerThread implements Runnable {
@Override
public void run() {
// 配置消费者属性
Properties props = new Properties();
props.put("bootstrap.servers", "114.15.78.14:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 调整消费者配置
props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据
props.put("fetch.max.wait.ms", "500"); // 最大等待500ms
// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 批量提交偏移量
consumer.commitSync();
}
}
}
}
}
public class KafkaConsumerDemo {
public static void main(String[] args) {
int numConsumers = 5; // 增加消费者的数量
for (int i = 0; i < numConsumers; i++) {
new Thread(new KafkaConsumerThread()).start();
}
}
static class KafkaConsumerThread implements Runnable {
@Override
public void run() {
// 配置消费者属性
Properties props = new Properties();
props.put("bootstrap.servers", "114.15.78.14:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 调整消费者配置
props.put("fetch.min.bytes", "1"); // 减少最小获取字节数
props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间
props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数
// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
processRecords(records); // 异步处理消息
consumer.commitAsync(); // 异步提交偏移量
}
}
}
private void processRecords(ConsumerRecords<String, String> records) {
// 异步处理消息的逻辑
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// 这里可以添加消息处理逻辑,例如使用线程池并行处理
}
}
}
}