0
点赞
收藏
分享

微信扫一扫

RocketMq 广播模式

系列

  • RocketMq 消息Tag过滤
  • RocketMq 广播模式
  • RocketMQ 同步调用的新特性
  • RocketMq 事务消息
  • RocketMq 堆积查询
  • RocketMq 消息查询
  • RocketMq 消费位点上报

开篇

  • 这个系列主要用来讲解RocketMq的一些常用功能的实现原理,包括 消息过滤、广播模式消费、事务消息、同步调用等功能。

  • 这篇文章主要是用来讲解RocketMq广播模式消费实现逻辑,核心差别在于1、广播模式消费位移使用本地文件存储、2、Rebalance过程中同一个ConsumeGroup下的consumer不会进行MessageQueue的分配;而是每个consumer负责消费所有MessageQueue


广播模式例子

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_test");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "A|B");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context)
{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.printf("Consumer Started.%n");
}
}
  • 设置广播模式consumer.setMessageModel(MessageModel.BROADCASTING)。


consumer的本地消费位移

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

this.copySubscription();

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 针对广播模式,我们使用本地文件存储位移
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

this.consumeMessageService.start();

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
}
  • 针对广播模式,我们使用本地文件LocalFileOffsetStore来存储消费位移。


public class LocalFileOffsetStore implements OffsetStore {
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
private final static InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String groupName;
private final String storePath;
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();

public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
}
}
  • LocalFileOffsetStore的目录的规则如上所示。


本地存储消费位移

cat /Users/lebron374/.rocketmq_offsets/192.168.0.8\@DEFAULT/consumer_group_test/offsets.json
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":7,
"topic":"TopicTest"
}:1123,{
"brokerName":"broker-a",
"queueId":6,
"topic":"TopicTest"
}:1119,{
"brokerName":"broker-a",
"queueId":3,
"topic":"TopicTest"
}:1121,{
"brokerName":"broker-a",
"queueId":2,
"topic":"TopicTest"
}:1116,{
"brokerName":"broker-a",
"queueId":5,
"topic":"TopicTest"
}:1114,{
"brokerName":"broker-a",
"queueId":4,
"topic":"TopicTest"
}:1119,{
"brokerName":"broker-a",
"queueId":1,
"topic":"TopicTest"
}:1141,{
"brokerName":"broker-a",
"queueId":0,
"topic":"TopicTest"
}:1139
}
  • 本地消费位移存储的格式以MessageQueue对象作为key,value为对应的消费位移。


广播模式Rebalance过程

public abstract class RebalanceImpl {

private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 负载均衡的过程中没有进行AllocateMessageQueueStrategy进行分配
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// 省略代码
break;
}
default:
break;
}
}
}
  • 广播模式消费场景下的rebalance过程相比集群模式而言不存在分配MessageQueue的过程,每个consumer负责订阅的topic下的所有MessageQueue。
  • rebalanceByTopic内部针对BROADCASTING场景直接调用updateProcessQueueTableInRebalance。
举报

相关推荐

0 条评论