0
点赞
收藏
分享

微信扫一扫

RocketMQ - 消费者Rebalance机制

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic 扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的呢?

RebalancePullImpl 和 RebalancePushImpl 两个重平衡实现类,分别被 DefaultMQPullConsumer 和DefaultMQPushConsumer 使用。下面讲一下 Rebalancelmpl 的核心属性和方法

RocketMQ - 消费者Rebalance机制_加锁

核心属性

public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
//记录MessageQueue和ProcessQueue的关系。MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
//Topic 路由信息 。保存 Topic 和 MessageQueue的关系。
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
//真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
protected String consumerGroup;
protected MessageModel messageModel;
//消费分配策略的实现
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//client实例对象
protected MQClientInstance mQClientFactory;
}

核心方法

public abstract class RebalanceImpl {
//为MessageQueue加锁
public boolean lock(final MessageQueue mq) {}
//执行Rebalance操作
public void doRebalance(final boolean isOrder) {}
//通知Message发生变化,这个方法在Push和Pull两个类中被重写
public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
//去掉不再需要的 MessageQueue
public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
//执行消息拉取请求
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
//在Rebalance中更新processQueue
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder)

}

Rebalancelmpl 、 RebalancePushImpl 、 RebalancePullImpl 是Rebalance的核心实现,主要逻辑都在Rebalancelmpl中,因为Pull消费者和Push消费者对Rebalance的需求不同,在各自的实现中重写了部分方法,以满足自身需求

如果有一个消费者实例下线了,Broker和其他消费者是怎么做Rebalance的呢

RocketMQ - 消费者Rebalance机制_客户端_02

@Override
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}

log.info(this.getServiceName() + " service end");
}

目前队列分配策略有以下5种实现方法

  • AllocateMessageQueueAveragely:平均分配,也是默认使用的策略(强烈推荐)。
  • AllocateMessageQueueAveragelyByCircle:环形分配策略。
  • AllocateMessageQueueByConfig:手动配置。
  • AllocateMessageQueueConsistentHash:一致性Hash分配。
  • AllocateMessageQueueByMachineRoom:机房分配策略



举报

相关推荐

0 条评论