0
点赞
收藏
分享

微信扫一扫

RocketMQ源码解析:消息发送流程


RocketMQ源码解析:消息发送流程_后端

RocketMQ发送消息的三种方式

在RocketMQ中发送消息的方式有如下三种,同步发送,异步发送和单向发送。其中前2中发送方式是可靠的,因为会有发送是否成功的应答,而单向发送只管发不管发送是否成功

同步发送消息,用来发送比较重要的消息通知

public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

异步发送消息,用来对响应时间敏感的业务场景,即发送端不能长时间等待Broker的响应

用SendCallback实现类来接收发送结果

public class AsyncProducer {
public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {

DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);

int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback异步接收发送的结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}

单向发送消息,用来不关心发送结果的场景,比如日志发送

public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);

}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

RocketMQ五种消息类型

RocketMQ源码解析:消息发送流程_后端_02

批量消息

List<Message> messageList = new ArrayList<>();
messageList.add(new Message(TOPIC_NAME, TAG_NAME, "id001", "hello world1".getBytes()));
messageList.add(new Message(TOPIC_NAME, TAG_NAME, "id002", "hello world2".getBytes()));
messageList.add(new Message(TOPIC_NAME, TAG_NAME, "id003", "hello world3".getBytes()));
producer.send(messageList);

批量消息即一次性发送多个消息,底层实现是把Collection<Message>转为MessageBatch对象发送出去

顺序消息

顺序消息分为局部有序和全局有序

局部有序:同一个业务相关的消息是有序的,如针对同一个订单的创建和付款消息是有序的,只需要在发送的时候指定message queue即可,如下所示,将同一个orderId对应的消息发送到同一个队列

SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
* @param mqs topic对应的message queue
* @param msg send方法传入的message
* @param arg send方法传入的orderId
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据业务对象选择对应的队列
Integer orderId = (Integer) arg;
int index = orderId % mqs.size();
return mqs.get(index);
}
}, orderId);

消费者所使用的Listener必须是MessageListenerOrderly(对于一个队列的消息采用一个线程去处理),而平常的话我们使用的是MessageListenerConcurrently

全局有序:要想实现全局有序,则Topic只能有一个message queue。

事务消息

事务消息的处理流程会在后续文章中详细分析

延迟消息

RocketMQ并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

Message message = new Message(TOPIC_NAME, TAG_NAME, ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置消息延迟级别为2,延时5s左右
message.setDelayTimeLevel(2);
SendResult sendResult = producer.send(message);

我们可以通过设置消息的延迟级别来让消息变为一个延迟消息,总共有18个延迟级别,对应的延迟时间如下

// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

源码解析

Producer发送消息流程

RocketMQ三种发送消息的方式,最终都会调用到DefaultMQProducerImpl#sendDefaultImpl方法,我们分析这个方法即可,整理流程如图所示

RocketMQ源码解析:消息发送流程_开发语言_03


RocketMQ源码解析:消息发送流程_java_04


我们重点看上图标红的2个过程哈,首先是根据topic选择要发送到那个MessageQueue

RocketMQ源码解析:消息发送流程_发送消息_05

rocketmq默认不开启broker规避,所以我们先分析不开启broker规避的,实现还是很简单的。

  1. 当lastBrokerName为null的时候,说明这是消息第一次发送,按照轮询的方式选一个MessageQueue即可。
  2. 当lastBrokerName不为null的时候,说明上一次发送消息失败了,这次只需要选择一个和上次brokerName不同的MessageQueue即可,有可能所有的MessageQueue所在的brokerName都和上一次的brokerName相同,所以我们还得要一个兜底策略。即都相同时,还是按照轮询方式选一个MessageQueue即可

RocketMQ源码解析:消息发送流程_发送消息_06

消息发送高可用

那这个borker规避有啥用呢?

其实producer端是通过重试和broker规避来实现消息发送的高可用的

首先说重试,当发送消息的时候可以重试次数,当发送失败时会进行重试

// 设置同步发送消息重试次数,默认2次
producer.setRetryTimesWhenSendFailed(3);
// 设置异步发送消息重试次数,默认2次
producer.setRetryTimesWhenSendAsyncFailed(3);

而broker规避的实现也比较简单,根据消息的响应时间算出broker的不可用时间

RocketMQ源码解析:消息发送流程_java_07

举个例子哈,假如说一个发送到broker-a的消息,响应时间为400ms,则算出的broker不可用时间为0,怎么算出来的呢?

只是从后向前遍历上面那个对应关系,找到第一个比响应时间小的值,然后就找到对应的broker不可用时间了(看完后面的代码你会理解的更清楚)

当前时间+broker不可用时间就是broker开始可用时间

broker名称

broker开始可用时间

broker-a

1637895389184(2021-11-26 10:56:29)

broker-b

1637896502158(2021-11-26 11:15:02)

如果后续发送消息时当前时间>=broker开始可用时间,则可以往broker发送消息,否则不可以

当producer端收到消息时,或者发送消息发生异常时,则会频繁调用如下方法,更新broker的可用时间

RocketMQ源码解析:消息发送流程_后端_08


如果发送有异常,则设消息响应时间为30000,然后计算对应的不可用时间

/**
* MQFaultStrategy
* @param brokerName
* @param currentLatency 消息的响应时间
* @param isolation 是否发生异常
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 如果发送有异常,则设消息响应时间为30000
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}

这个就是计算broker不可用的具体策略,上面我们已经画图演示了哈

// MQFaultStrategy
// 根据消息响应时间计算broker不可用时间
// private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
// private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}

return 0;
}

更新broker可用时间

// LatencyFaultToleranceImpl
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
// 获取失败条目
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
// broker开始可用时间 = 当前时间 + 规避时间
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}

判断当前broker是否可用,当前时间>=开始可用时间则表明当前broker可用

// FaultItem
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}

Producer的三种发送消息的方式

RocketMQ源码解析:消息发送流程_响应时间_09


消息发送有三种模式MQClientAPIImpl#sendMessage

RocketMQ源码解析:消息发送流程_响应时间_10


oneway方式发送,发送完一次就返回null,不管发送结果

同步发送会一直等Broker端的响应,然后交给MQClientAPIImpl#processSendResponse构造返回结果SendResult,并返回给用户

RocketMQ源码解析:消息发送流程_后端_11


异步方式发送当收到结果时会回调InvokeCallback#operationComplete方法

和同步发送的方式类似,当收到响应时,会交给MQClientAPIImpl#processSendResponse构造返回结果SendResult,然后返回给用户。如果你发送消息的时候设置了SendCallback(用来处理消息的返回结果),发送成功则会调用SendCallback#onSuccess方法,发送失败则会调用SendCallback#onException方法

RocketMQ源码解析:消息发送流程_响应时间_12


至此消息发送完毕,我们来看消息在broker端是如何存储的?

参考博客


举报

相关推荐

0 条评论