0
点赞
收藏
分享

微信扫一扫

RocketMQ基础篇(下)

Java架构领域 2021-10-09 阅读 69

大纲

玩转各种消息

1.普通消息

    整体流程如下

    >导入MQ客户端依赖

        <dependency>

            <groupId>org.apache.rocketmq</groupId>

            <artifactId>rocketmq-client</artifactId>

            <version>4.8.0</version>

        </dependency>

    >消息发送者步骤

        1.创建消息生产者producer,并指定生产者组名

        2.指定NameServer地址

        3.启动producer

        4.创建消息对象,指定Topic、Tag和消息体

        5.发送消息

        6.关闭生产者producer

    >消息消费者步骤

        1.创建消费者Consumer,指定消费者组名

        2.指定NameServer地址

        3.订阅主题Topic和Tag

        4.设置回调函数,处理消息

        5.启动消费者consumer

(1)消息发送

发送同步消息

    这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

    代码演示

    同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。

    >Message ID

    消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条信息。

    >SendStatus

    发送的标识。成功,失败等。

    >Queue

    相当于是Topic的分区:用于并行发送和接收消息。

发送异步消息

    异常消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    代码演示

消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

单向发送

    这种方式主要用在不特别关心发送结果的场景,例如:日志发送。

    代码演示

    单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

消息发送的权衡

(2)消息消费

集群消费

    消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

    实际上,每个Consumer是平均分摊Message Queue的去做拉取消息。例如:某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。

    而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。

    这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker

    代码演示

广播消费

    消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。

    实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

    这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地

    代码演示

消息消费时的权衡

    集群模式:适用场景&注意事项

    消费端集群化部署,每条消息 只需要被处理一次。

    由于消费进度在服务端维护,可靠性更高。

    集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。

    集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

    广播模式:适用场景&注意事项

    广播消费模式下不支持顺序消息。

    广播消费模式下不支持重置消费位点。

    每条消息都需要被相同逻辑的多台机器处理。

    消费进度在客户端维护,出现重复的概率稍大于集群模式。

    广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。

    广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

    广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

    目前仅Java客户端支持广播模式。

    广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

2.顺序消息

    消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

    顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

    >全局顺序消息

    >部分顺序消息

(1)顺序消息生产

    一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。


使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue,consume消费消息失败时,不能返回reconsume--later,这样会导致乱序。

应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。

(2)顺序消息消费

    消费时,同一个OrderId获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。

3.消息发送时的重要方法/属性

(1)属性

org.apache.rocketmq.example.details. ProducerDetails 类中

    producerGroup:生产者所属组

    defaultTopicQueueNums:默认主题在每一个Broker队列数量

    sendMsgTimeout:发送消息默认默认超时时间,默认3s

    compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k

    retryTimesWhenSendFailed:同步方式发送消息重复次数,默认为2,总共执行3次

    retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2

    retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false

    maxMessageSize:允许发送的最大消息长度,默认为4M

(2)方法

    org.apache.rocketmq.example.details. ProducerDetails 类中

单向发送

同步发送

异步发送

4.消息消费时的重要方法/属性

    org.apache.rocketmq.example.details. ComuserDetails 类中

(1)属性

(2)方法

    void subscribe(final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器

    void unsubscribe(final String topic):取消消息订阅

    Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列

    void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器

void registerMessageListener(final MessageListenerOrderly messageListener):注册顺序消息事件监听器

(3)消费确认(ACK)

    业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才会认为这批消息(默认是 1 条) 是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。

    返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就会认为这批消息消费失败了。

    如果业务的回调没有处理好而抛出异常,会认为是消费失败 ConsumeConcurrentlyStatus.RECONSUME_LATER 处理。

    为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费组的 RETRY topic),在延迟的某个时间点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。

    另外如果使用顺序消费的回调 MessageListenerOrderly 时,由于顺序消费是要前者消费成功才能继续消费,所以没有 RECONSUME_LATER 的这个状态, 只有 SUSPEND_CURRENT_QUEUE_A_MOMENT 来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

5.延时消息

(1)概念介绍

    延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息。

(2)适用场景

    消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

(3)使用方式

    Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。(阿里云 RocketMQ 提供了任意时刻的定时消息功能,Apache 的 RocketMQ 并没有,阿里并没有开源)。

    发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。

    延迟消息是根据延迟队列的 level 来的,延迟队列默认是

    msg.setDelayTimeLevel(3)代表延迟 10 秒

    "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

是这 18 个等级(秒(s)、分(m)、小时(h)),level 为 1,表示延迟 1 秒后消费,level 为 5 表示延迟 1 分钟后消费,level 为 18 表示延迟 2 个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的 level 即可。消费消息跟普通的消费消息一致。

(4)代码演示

    org.apache.rocketmq.example. scheduled 包中

生产者

消费者

6.批量消息

    批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。

(1)代码演示

    org.apache.rocketmq.example. batch 包中

生产者

消费者

(2)批量切分

    如果消息的总长度可能大于 4MB 时,这时候最好把消息进行分割。

代码演示

    我们需要发送 10 万元素的数组,这个量很大,怎么快速发送完。同时每一次批量发送的消息大小不能超过 4M 。

    具体见代码

7.过滤消息

    org.apache.rocketmq.example. filter 包中

(1)Tag过滤

    在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。

    消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。

(2)Sql过滤

SQL基本语法

RocketMQ 定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:

    数值比较:比如:>,>=,<,<=,BETWEEN,=;

    字符比较:比如:=,<>,IN;

    IS NULL 或者 IS NOT NULL;

    逻辑符号:AND,OR,NOT;

    常量支持类型为:

    数值,比如:123,3.1415;

    字符,比如:'abc',必须用单引号包裹起来;

    NULL,特殊的常量;

    布尔值,TRUE 或 FALSE;

消息生产者(加入消息属性)

    发送消息时,你能通过 putUserProperty 来设置消息的属性。

消息消费者(使用SQL筛选)

    用 MessageSelector.bySql 来使用 sql 筛选消息。

如果这个地方抛出错误:说明 Sql92 功能没有开启。

需要修改 Broker.conf 配置文件。

加入 enablePropertyFilter=true 然后重启 Broker 服务。

8.事务消息

    其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

(1)正常事务流程

    (1)发送消息(half 消息):图中步骤 1。

    (2)服务端响应消息写入结果:图中步骤 2。

    (3)根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。

    (4)根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4

(2)事务补偿流程

    (1)对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。

    (2)Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。

    (3)根据本地事务状态,重新 Commit 或者 Rollback::图中步骤 7。

    其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

(3)事务消息状态

    事务消息共有三种状态,提交状态、回滚状态、中间状态:

   > TransactionStatus.CommitTransaction: 提交状态,它允许消费者消费此消息(完成图中了 1,2,3,4 步,第 4 步是Commit)。 

    >TransactionStatus.RollbackTransaction: 回滚状态,它代表该消息将被删除,不允许被消费(完成图中了 1,2,3,4 步, 第 4 步是 Rollback)。

    >TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态(完成图中了 1,2,3 步, 但是没有 4 或者没有 7,无法 Commit 或 Rollback)。

(4)代码演示

    org.apache.rocketmq.example. transaction 包中

创建事务性生产者

    使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。

实现事务的监听接口

    当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务(步骤 3)。它返回前一节中提到的三个事务状态之一。 checkLocalTranscation 方法用于检查本地事务状态(步骤 5),并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

(5)使用场景

    用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、余额进行回退。如何保证数据的完整性?

    可以使用 RocketMQ 的分布式事务保证在下单失败后系统数据的完整性。

(6)使用限制

    1. 事务消息不支持延时消息和批量消息。

    2.事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过 Broker 的配置文件设置好。

    3. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。

    4.事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于transactionMsgTimeout 参数。

    5.事务性消息可能不止一次被检查或消费。

    6.事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。

    7.提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

    8. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者ID 查询到消费者。


我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞收藏评论,我们下期见!


上一篇:RocketMQ基础篇(上)

下一篇:RocketMQ底层原理之存储设计

举报

相关推荐

0 条评论