消息查询介绍
当我们使用MQ的时候,一个比较好的习惯就是把发送的消息和接收到的消息打印到日志中。当我们排查问题的时候就非常方便。
例如B服务需要消费A服务发送的关闭订单的消息。如果一个订单迟迟未关闭,此时可能有三种情况
- A服务发送失败
- B服务消费失败
- B服务没有收到消息
A服务发送失败可以通过日志快速确认。
如果A服务发送成功,此时就可能根据消息的属性快速查找到具体的消息,看这条消息是否被对应的ConsumerGroup消费过,如果没消费此时说明B服务之类的配置可能有问题,导致没有收到消息。如果被消费过说明B服务消费的逻辑可能有问题,查看对应的日志排查即可,问题修复后,可以通过命令或者管控台重新发送消息即可。
那么查找指定消息的方式有几种呢?
前面我们说过,RocketMQ中用QueryMessageProcessor来处理消息查找的请求
可以看到有两种处理方式
- QueryMessageProcessor#queryMessage,根据Unique Key和 Message Key查询
- QueryMessageProcessor#viewMessageById,根据Message Id查询
Unique Key,Message Key,Message Id又是怎么来的?
Unique Key是在producer端发送消息生成的
// DefaultMQProducerImpl#sendKernelImpl
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
Message Key是我们在发送消息的时候设置的哈,通常具有业务意义,方便我们快速查找消息
// 指定 topicName,tagName,MessageKey,消息内容,然后发送消息
String messageKey = UUID.randomUUID().toString();
Message message = new Message(TOPIC_NAME, TAG_NAME, messageKey,
("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
Message Id是在broker端生成的,当将消息写入内存后,返回结果中会设置Message Id(主要内容就是broker的地址和消息写入的偏移量)
// DefaultAppendMessageCallback#doAppend
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
跑个Demo演示一下
// 指定 topicName 和消息内容,然后发送消息
Message message = new Message(TOPIC_NAME, ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
SendResult
[sendStatus=SEND_OK,
msgId=C0A86130AD2D18B4AAC22CAD6B000000,
offsetMsgId=31E8336700002A9F00000000000A1AAA,
messageQueue=MessageQueue [topic=quickStartTopic, brokerName=broker-a, queueId=15], queueOffset=6]
sendStatus:发送状态
msgId:虽然命名为msgId,实际是Unique Key
offsetMsgId:Broker返回的Message ID
messageQueue:消息发送到哪个队列
queueOffset:消息在队列中的偏移量
此时我们根据msgId和offsetMsgId都能在管控台的MESSAGE ID tab页查找到对应的消息
也可以根据指定的messageKey在管控台的MESSAGE KEY tab页查找到对应的消息
消息查询可以通过命令行和管控台实现。一般情况下用管控台足够了
按照Message Id查询
通过Message Id查询时客户端会调用到如下方法
org.apache.rocketmq.client.impl.MQAdminImpl#viewMessage
根据Message Id查找消息的逻辑非常简单,从Message Id中解析出broker的地址和消息的偏移量,发送查找请求(查找请求包含消息的偏移量)到broker,broker收到请求后到commitLog根据消息的偏移量查找到对应的消息返回即可。
调用链路如下
- org.apache.rocketmq.client.impl.MQAdminImpl#viewMessage
- org.apache.rocketmq.broker.processor.QueryMessageProcessor#viewMessageById
- org.apache.rocketmq.store.DefaultMessageStore#selectOneMessageByOffset(long)
根据Unique Key和 Message Key查询
通过Message Id查询时客户端会调用到如下方法
org.apache.rocketmq.client.impl.MQAdminImpl#queryMessage(java.lang.String, java.lang.String, int, long, long, boolean)
最终会调用到如下方法
org.apache.rocketmq.store.index.IndexFile#selectPhyOffset
前面章节说过IndexFile本质上可以理解为一个大的HashMap,其中的key有两种形式
- Topic#Unique Key
- Topic#Message Key
当根据Unique Key和Message Key查找消息时,求出对应key的hash值,然后将这个hash值对应的所有的消息从IndexFile中查找出来,接着根据phyoffset从CommitLog中找到消息具体的内容,然后返回
按照这种形式返回的消息有问题吗?
其实是有问题的,因为hash值相同,并不代表消息的Unique Key或Message Key相同,所以此时还需要在客户端根据具体的值再过滤一遍,源码如下
org.apache.rocketmq.client.impl.MQAdminImpl#queryMessage(java.lang.String, java.lang.String, int, long, long, boolean)
如果是Unique Key,则直接判等。如果是Message Key,因为一个消息可以设置多个Message Key,所以只要有匹配则会将消息返回
追一下调用链路大概就全懂了
- org.apache.rocketmq.client.impl.MQAdminImpl#queryMessage(java.lang.String, java.lang.String, int, long, long, boolean)
- org.apache.rocketmq.broker.processor.QueryMessageProcessor#queryMessage
- org.apache.rocketmq.store.index.IndexFile#selectPhyOffset
参考博客
[1]http://www.tianshouzhi.com/api/tutorials/rocketmq/410