0
点赞
收藏
分享

微信扫一扫

RocketMQ源码解析:如何快速查找消息?


RocketMQ源码解析:如何快速查找消息?_java

消息查询介绍

当我们使用MQ的时候,一个比较好的习惯就是把发送的消息和接收到的消息打印到日志中。当我们排查问题的时候就非常方便。

例如B服务需要消费A服务发送的关闭订单的消息。如果一个订单迟迟未关闭,此时可能有三种情况

  1. A服务发送失败
  2. B服务消费失败
  3. B服务没有收到消息

A服务发送失败可以通过日志快速确认。

如果A服务发送成功,此时就可能根据消息的属性快速查找到具体的消息,看这条消息是否被对应的ConsumerGroup消费过,如果没消费此时说明B服务之类的配置可能有问题,导致没有收到消息。如果被消费过说明B服务消费的逻辑可能有问题,查看对应的日志排查即可,问题修复后,可以通过命令或者管控台重新发送消息即可。

那么查找指定消息的方式有几种呢?

前面我们说过,RocketMQ中用QueryMessageProcessor来处理消息查找的请求

RocketMQ源码解析:如何快速查找消息?_rabbitmq_02


可以看到有两种处理方式

  1. QueryMessageProcessor#queryMessage,根据Unique Key和 Message Key查询
  2. QueryMessageProcessor#viewMessageById,根据Message Id查询

Unique Key,Message Key,Message Id又是怎么来的?

Unique Key是在producer端发送消息生成的

// DefaultMQProducerImpl#sendKernelImpl
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

Message Key是我们在发送消息的时候设置的哈,通常具有业务意义,方便我们快速查找消息

// 指定 topicName,tagName,MessageKey,消息内容,然后发送消息
String messageKey = UUID.randomUUID().toString();
Message message = new Message(TOPIC_NAME, TAG_NAME, messageKey,
("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);

Message Id是在broker端生成的,当将消息写入内存后,返回结果中会设置Message Id(主要内容就是broker的地址和消息写入的偏移量)

// DefaultAppendMessageCallback#doAppend
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};

跑个Demo演示一下

// 指定 topicName 和消息内容,然后发送消息
Message message = new Message(TOPIC_NAME, ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);

SendResult
[sendStatus=SEND_OK,
msgId=C0A86130AD2D18B4AAC22CAD6B000000,
offsetMsgId=31E8336700002A9F00000000000A1AAA,
messageQueue=MessageQueue [topic=quickStartTopic, brokerName=broker-a, queueId=15], queueOffset=6]

sendStatus:发送状态
msgId:虽然命名为msgId,实际是Unique Key
offsetMsgId:Broker返回的Message ID
messageQueue:消息发送到哪个队列
queueOffset:消息在队列中的偏移量

此时我们根据msgId和offsetMsgId都能在管控台的MESSAGE ID tab页查找到对应的消息

RocketMQ源码解析:如何快速查找消息?_java_03


也可以根据指定的messageKey在管控台的MESSAGE KEY tab页查找到对应的消息

消息查询可以通过命令行和管控台实现。一般情况下用管控台足够了

按照Message Id查询

通过Message Id查询时客户端会调用到如下方法

org.apache.rocketmq.client.impl.MQAdminImpl#viewMessage

RocketMQ源码解析:如何快速查找消息?_发送消息_04


根据Message Id查找消息的逻辑非常简单,从Message Id中解析出broker的地址和消息的偏移量,发送查找请求(查找请求包含消息的偏移量)到broker,broker收到请求后到commitLog根据消息的偏移量查找到对应的消息返回即可。

调用链路如下

  1. org.apache.rocketmq.client.impl.MQAdminImpl#viewMessage
  2. org.apache.rocketmq.broker.processor.QueryMessageProcessor#viewMessageById
  3. org.apache.rocketmq.store.DefaultMessageStore#selectOneMessageByOffset(long)

根据Unique Key和 Message Key查询

通过Message Id查询时客户端会调用到如下方法

org.apache.rocketmq.client.impl.MQAdminImpl#queryMessage(java.lang.String, java.lang.String, int, long, long, boolean)

最终会调用到如下方法
org.apache.rocketmq.store.index.IndexFile#selectPhyOffset

RocketMQ源码解析:如何快速查找消息?_java_05

前面章节说过IndexFile本质上可以理解为一个大的HashMap,其中的key有两种形式

  1. Topic#Unique Key
  2. Topic#Message Key

当根据Unique Key和Message Key查找消息时,求出对应key的hash值,然后将这个hash值对应的所有的消息从IndexFile中查找出来,接着根据phyoffset从CommitLog中找到消息具体的内容,然后返回

按照这种形式返回的消息有问题吗?

其实是有问题的,因为hash值相同,并不代表消息的Unique Key或Message Key相同,所以此时还需要在客户端根据具体的值再过滤一遍,源码如下

org.apache.rocketmq.client.impl.MQAdminImpl#queryMessage(java.lang.String, java.lang.String, int, long, long, boolean)

RocketMQ源码解析:如何快速查找消息?_apache_06


如果是Unique Key,则直接判等。如果是Message Key,因为一个消息可以设置多个Message Key,所以只要有匹配则会将消息返回

追一下调用链路大概就全懂了

  1. org.apache.rocketmq.client.impl.MQAdminImpl#queryMessage(java.lang.String, java.lang.String, int, long, long, boolean)
  2. org.apache.rocketmq.broker.processor.QueryMessageProcessor#queryMessage
  3. org.apache.rocketmq.store.index.IndexFile#selectPhyOffset

参考博客

[1]http://www.tianshouzhi.com/api/tutorials/rocketmq/410


举报

相关推荐

0 条评论