文章目录
消息处理流程
- SendMessageProcessor处理类接收到消息
- DefaultMessageStore实例将消息变成- IndexFile、- ConsumeQueue和- CommitLog对象
- 上述对象转成内存映射对象后进行落盘
  
 
消息存储目录结构
RocketMQ的文件存储在store文件夹里,里面包含commitlog,config,consumerqueue,index文件夹和abort,checkpoint两个文件。
文件夹:
- commitlog存储写入到- commitLog的消息内容
- config存储配置信息
- consumerqueue存储消费者队列信息
- index存储消息队列的索引文件
文件:
- abort标记- RocketMQ是否正常退出
- checkpoint存储- commitlog,- config,- consumerqueue,- index文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   ├── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerOffset.json
│   ├── delayOffset.json
│   ├── subscriptionGroup.json
│   ├── topics.json
├── consumequeue
│   ├── TopicA
│   ├── TopicB
│   ├── TopicC
├── index
│   ├── 00000000000000000000
│   ├── 00000000001073741824
RocketMQ内有专门对应磁盘上存储文件的封装类:
- CommitLog:对应- commitlog文件
- ConsumeQueue:对应- consumerqueue文件
- IndexFile:对应- index文件
- MappedFile:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘
- MappedFileQueue:连续物理存储的封装类,可以通过- offset快速定位消息所在的- MappedFile
- MappedFileBuff:堆外内存
SendMessage源码
 
SendMessageProcessor是接收消息的一个钩子函数,该类的对象将会处理发送到Broker的消息
processRequest
 
主要流程已在代码片注释中给出:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    private List<ConsumeMessageHook> consumeMessageHookList;
	public SendMessageProcessor(final BrokerController brokerController) {
        super(brokerController);
    }
	public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
            	// 解析请求体
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                // 建立消息上下文
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                // 发送消息前的逻辑
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }
	/**
	* ......
	**/
}
根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader,然后调用asyncSend(Batch)Message方法进行消息的发送。
该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage方法,如果是接收消息则调用pullMessage拉取消息的方法。
sendMessage
 
消息发送给Broker服务器时,调用的是sendMessage方法接收并存储消息,主要流程已在代码片注释中给出:
    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
		// 初始化响应
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        // 构建响应头
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }
        final byte[] body = request.getBody();
        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);
        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return CompletableFuture.completedFuture(response);
        }
		// 设置消息体数据
        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        MessageAccessor.setProperties(msgInner, origProps);
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        // 获取Broker集群名称
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
        // 同步等待消息存储成功
        if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
            String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
            origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
        } else {
            // 异步
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        }
        CompletableFuture<PutMessageResult> putMessageResult = null;
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
        	// Broker拒绝接收消息
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }









