0
点赞
收藏
分享

微信扫一扫

7.【kafka运维】 kafka-consumer-groups.sh消费者组管理



文章目录

  • ​​消费者组管理 kafka-consumer-groups.sh​​
  • ​​1. 查看消费者列表`--list`​​
  • ​​2. 查看消费者组详情`--describe`​​
  • ​​3. 删除消费者组`--delete`​​
  • ​​4. 重置消费组的偏移量 `--reset-offsets`​​
  • ​​5. 删除偏移量`delete-offsets`​​
  • ​​More​​


日常运维问题排查 怎么能够少了滴滴开源的
滴滴开源LogiKM一站式Kafka监控与管控平台


消费者组管理 kafka-consumer-groups.sh

1. 查看消费者列表​​--list​


​sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list ​7.【kafka运维】 kafka-consumer-groups.sh消费者组管理_重置


先调用​​MetadataRequest​​拿到所有在线Broker列表

再给每个Broker发送​​ListGroupsRequest​​请求获取 消费者组数据

2. 查看消费者组详情​​--describe​

​DescribeGroupsRequest​

查看消费组详情--group​--all-groups​


查看指定消费组详情--group​​
​sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group​

查看所有消费组详情--all-groups​​
​sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups​​ 查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息
7.【kafka运维】 kafka-consumer-groups.sh消费者组管理_重置_02


查询消费者成员信息--members


所有消费组成员信息
​sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090​指定消费组成员信息
​sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090 ​7.【kafka运维】 kafka-consumer-groups.sh消费者组管理_重置_03


查询消费者状态信息--state


所有消费组状态信息
​sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090​指定消费组状态信息
​sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090​7.【kafka运维】 kafka-consumer-groups.sh消费者组管理_bootstrap_04


3. 删除消费者组​​--delete​

​DeleteGroupsRequest​

删除消费组–delete


删除指定消费组--group​​
​sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090​删除所有消费组--all-groups​​
​ sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090​


PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常

Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

4. 重置消费组的偏移量 ​​--reset-offsets​

能够执行成功的一个前提是 消费组这会是不可用状态;

下面的示例使用的参数是: ​​--dry-run​​ ;这个参数表示预执行,会打印出来将要处理的结果;

等你想真正执行的时候请换成参数​​--excute​​ ;

下面示例 重置模式都是 ​​--to-earliest​​ 重置到最早的;

请根据需要参考下面 相关重置Offset的模式 换成其他模式;

重置指定消费组的偏移量 --group


重置指定消费组的所有Topic的偏移量--all-topic​​
​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic​重置指定消费组的指定Topic的偏移量--topic​​
​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2​


重置所有消费组的偏移量 --all-group


重置所有消费组的所有Topic的偏移量--all-topic​​
​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic​重置所有消费组中指定Topic的偏移量--topic​​
​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2​


​--reset-offsets​​ 后面需要接重置的模式

相关重置Offset的模式

参数

描述

例子

--to-earliest :

重置offset到最开始的那条offset(找到还未被删除最早的那个offset)

--to-current:

直接重置offset到当前的offset,也就是LOE

--to-latest

重置到最后一个offset

--to-datetime:

重置到指定时间的offset;格式为:​​YYYY-MM-DDTHH:mm:SS.sss​​;

​ --to-datetime "2021-6-26T00:00:00.000"​

​--to-offset​

重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset<​​--to-offset​​​, 这个时候重置为目标最大offset;2.目标最小offset>​​--to-offset​​​ ,则重置为最小; 3.否则的话才会重置为​​--to-offset​​的目标值; 一般不用这个

​--to-offset 3465​​​ 7.【kafka运维】 kafka-consumer-groups.sh消费者组管理_kafka_05

​--shift-by ​

按照偏移量增加或者减少多少个offset;正的为往前增加;负的往后退;当然这里也是匹配所有的;

​--shift-by 100​​​ 、​​--shift-by -100​

​--from-file​

根据CVS文档来重置; 这里下面单独讲解

--from-file着重讲解一下


上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset;不过**​​--from-file​​**可以让我们更灵活一点;


  1. 先配置cvs文档
    格式为: Topic:分区号: 重置目标偏移量
test2,0,100
test2,1,200
test2,2,300
  1. 执行命令
  2. ​sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv​

5. 删除偏移量​​delete-offsets​

能够执行成功的一个前提是 消费组这会是不可用状态;

偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;


​sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2​


相关可选参数

参数

描述

例子

​--bootstrap-server ​

指定连接到的kafka服务;

–bootstrap-server localhost:9092

​--list ​

列出所有消费组名称

​--list ​

​--describe​

查询消费者描述信息

​--describe​

​--group​

指定消费组

​--all-groups​

指定所有消费组

​--members​

查询消费组的成员信息

​--state​

查询消费者的状态信息

​--offsets​

在查询消费组描述信息的时候,这个参数会列出消息的偏移量信息; 默认就会有这个参数的;

​dry-run​

重置偏移量的时候,使用这个参数可以让你预先看到重置情况,这个时候还没有真正的执行,真正执行换成​​--excute​​​;默认为​​dry-run​

​--excute​

真正的执行重置偏移量的操作;

​--to-earliest​

将offset重置到最早

​to-latest​

将offset重置到最近

More

Kafka专栏持续更新中…(源码、原理、实战、运维、视频、面试视频)

​​【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)_石臻臻的杂货铺-CSDN博客​​

​​【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,干货!!!非常干!!!建议收藏)​​

​​【kafka异常】kafka 常见异常处理方案(持续更新! 建议收藏)​​

​​【kafka运维】分区从分配、数据迁移、副本扩缩容 (附教学视频)​​

​​【kafka源码】ReassignPartitionsCommand源码分析(副本扩缩、数据迁移、副本重分配、副本跨路径迁移​​

​​【kafka】点击更多…​​



举报

相关推荐

0 条评论