0
点赞
收藏
分享

微信扫一扫

C++编程逻辑讲解step by step:图书管理系统中至少包括两个(但不限于两个)类:图书和读者。某读者借书并列出他的借书情况。

念川LNSC 2024-06-27 阅读 3

0. 引言

顺序消息在金融、电商等场景中被广泛应用,只要业务需求对流程顺序性有严格要求,就有顺序消息的应用之地。因此理解顺序消息的原理和实现路径,是我们学习rocketmq的必经之路。

1. 乱序产生在哪些阶段

在理解如何保证顺序性之前,我们要先理解什么情况会导致乱序,消息从产生到被消费的过程中,哪些阶段会导致消息乱序。

在这里插入图片描述
消息的发送实际上需要经历3个阶段:生产者发送消息到broker,broker存储消息,消费者从broker消费消息

那么我们依次来看这三个阶段是否有导致乱序的可能性

第一阶段:生产者发送消息到broker

假设我们是商品购买的场景,需要经历3个状态:创建订单、支付订单、签收订单。目前部署了多个订单管理服务节点,也就是有多个生产者,想象一个场景,生产者1先发送了订单创建消息,但是因为网络波动或者服务器异常等原因,导致发送经历了很久才到达broker,而生产者2晚1s发送了订单支付消息,传输耗时很短,反而比生产者1发送的订单创建消息更早发送到broker,这就导致支付消息反而比创建订单消息更早到达,从业务逻辑上显然不对,也无法保证顺序性了。

在这里插入图片描述
那么怎么解决呢?

首先网络波动我们肯定是无法保证的,传输过程的异常是无法干预了,那么只能从生产者着手了。

  • 通过业务时间间隔 + 业务状态控制
  • 通过同步发送 + 分布式锁
  • 同步发送 + 生产者单节点单线程

以上前两种手段一般需要我们结合着使用,如此,单纯从消息发送角度,可以控制顺序性,但是这样就够了吗?咱们往下看。

第二阶段:broker存储消息

我们继续结合broker存储消息的物理结构来探究存储消息时的顺序性问题。

前面我们聊过,rocketmq中同一个topic下是划分了多个queue的,queue才是存储消息的最小单位。而生产者在发送消息时是不指定具体的queue的,而是通过topic来指定发送。

那么就容易产生一种场景:
订单A的创建消息A1和支付消息A2,都是按照顺序发送到broker的,订单B也如此,但是因为一个topic下有多个queue,broker在分配消息时,就可能会将订单A的两个消息分别分发到两个队列,订单B的两个消息也分发到两个队列,如下图所示。
在这里插入图片描述
而消费者在消费时,可能存在多个消费者同时消费,而消费者可以监听不同队列,从而导致消费者同时获取到了订单A的创建消息和支付消息,或者先获取到了支付消息,从而打乱了先创建后支付的顺序性,如此也不满足顺序性。

那么怎么解决呢?

根本原因就是broker没有按照发送来的顺序进行存储,一个比较暴力的方式,就是控制队列数只有1个,这样可以保证一定是按照发送过来的顺序进行存储的。这个方法我们叫控制全局顺序性,他可以保证全局顺序,但是因为只有1个队列了,缺点就随之而来,那就是性能不足。
在这里插入图片描述

但实际上,我们多数场景并不要求严格的全局顺序,我们只需要保证局部顺序就够了, 怎么理解呢?

比如订单的这个场景,我们只需要同一个订单的消息保持顺序性即可,不同订单之间不要求顺序性。
也就是说只要订单A的消息A1在A2之前,订单B的消息B1在B2之前即可,至于A,B之间的顺序怎样都可以,不影响业务。
在这里插入图片描述
基于这样的考虑,rocketmq提出这样的解决办法:那就是把相同订单的消息放到同一个queue中,通过队列隔离来实现局部顺序性,那么对于单一订单而言,就只存在一个队列,也就保证了存储的消息顺序性。
在这里插入图片描述
同时还要保证同一个队列中的消息是FIFO(先进先出)的,即先发送的消息先被消费,不会出现跨序消费的问题,这就需要设置–order属性为true,下文我们详细讲解怎么设置。

broker挂了怎么办?

另外再考虑个场景,多节点部署时,如果某一个broker节点挂了,而之前某订单的消息又是存储在这个broker上面的,如何处理呢?

如果将新消息发送到其他broker上,那么之前的消息明显得不到消费,会出现乱序,同时如果broker恢复了,两个同时消费也会出现顺序问题。

这个问题在官方文档中也有解释:https://rocketmq.apache.org/zh/docs/4.x/producer/03message2/

在这里插入图片描述

第三阶段:消费者从broker消费消息

消费者端会产生乱序的场景,就在于多个同一订单的消息被不同的消费者消费了。举个例子,创建订单需要3s, 支付订单需要1s,如果同一订单的创建消息和支付消息被不同的消费者消费了,就会导致可能订单还没来得及创建时就执行支付操作了,显然会报错,顺序性就不满足了。

而当同一订单的消息被同一个消费者接收时,可以保证先创建再支付,当然前提是消费者消费这边要是同步执行的,不能异步执行。

在这里插入图片描述
那么rocektmq消费者端要实现顺序性的原理就是确保同一种类的消息被同一消费者消费。那么如何实现呢?

大家还记得我们在生产者中如何保证顺序性的吗?通过分布式锁,所以消费者端其实也是通过加锁来实现。只是这里的分布式锁没有依赖redis、zk之类的中间件,而是通过broker来生成分布式锁。

消费者端的加锁主要分成3步:
(1)消费者start启动时,消费者向broker申请对MessageQueue加锁,将消费者与队列绑定,保证后续这个队列消息只会发送给这个消费者
(2)消费者消费消息时,会申请MessageQueue锁,确保同一时间,一个队列只有一个线程处理消息
(3)为了保证消费过程中不会重复消费,还会对ProcesQueue加锁

下面我们从源码层级来分析这3个加锁过程,来帮助大家理解,如果不需要了解的,可以直接跳过查看后面的顺序消费实现章节

源码分析

  • (1)启动start方法时,消费者向broker申请加锁
    1、首先我们来回顾一下顺序消费的实现代码
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 设置topic
        consumer.subscribe("topic_order", "*");

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
              @Override
              public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                  byte[] body = list.get(0).getBody();
                  System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));
                  return ConsumeOrderlyStatus.SUCCESS;
              }
        });

        // 启动消费者实例
        consumer.start();

2、我们进入DefaultMQPushConsumer类的start方法,可以看到该方法里面是调用了DefaultMQPushConsumerImpl.start
在这里插入图片描述
3、在中间位置可以看到调用了ConsumeMessageOrderlyService.start()方法,而对应消费者申请分布式锁,锁定指定的队列的代码就在其中
在这里插入图片描述
4、可以看到该方法中,是将lockMQPeriodically加锁任务放入定时任务中,每20s执行一次。也就是消费者每20s会向broker进行分布式锁的续约。
在这里插入图片描述
5、我们继续往lockMQPeriodically方法中查看,该方法指向RebalanceImpl.lockAll方法,lockAll即向broker加锁该消费者占用的队列。
在这里插入图片描述

  • (2)消费者消费消息时,会申请MessageQueue锁
    1、在消费者进行消费时,会先执行下负载均衡的代码,来对topic分配队列,我们通过start方法来跟踪:DefaultMQPushConsumerImpl.start > MQClientInstance.start > RebalanceService.start > this.rebalanceService.start() > RebalanceService.run > MQClientInstance.doRebalance > MQConsumerInner.tryRebalance > DefaultMQPushConsumerImpl.tryRebalance > RebalanceImpl.doRebalance > RebalanceImpl.rebalanceByTopic

2、这里可以看到区分了两种消费模式,广播模式下是组下所有消费者都会收到消息,我们重点关注updateProcessQueueTableInRebalance方法
在这里插入图片描述
3、当为顺序消费时,会进行加锁操作,调用lock方法

在这里插入图片描述
4、lock方法中就是通过将MessageQueue进行broker加锁,以此保证同一时间只有一个消费者消费这个队列
在这里插入图片描述

  • (3)为了保证消费过程中不会重复消费,还会对ProcesQueue加锁
    1、从DefaultMQPushConsumerImpl.pullMessage拉去消息的方法查看,可以看到这里对processQueue的加锁状态进行了判断,如果加锁了,才会去获取消费偏移量,然后进行消费,以此保证不被其他线程重复消费相同的偏移量。
    在这里插入图片描述
    2、如果没有加锁,则会30s后再来执行消费,等待processQueue获取锁

    3、那么processQueue在哪儿加的锁呢?还记得咱们分析第一阶段消费者向broker申请锁吗,其实这时就会对processQueue设置locked=true,也就是每20s进行的一次加锁操作
    在这里插入图片描述
    如此通过这三次加锁,来保证消费者消费的顺序性。

2. 实现

2.1 顺序消息发送

官方文档:https://rocketmq.apache.org/zh/docs/4.x/producer/03message2

1、顺序发送的重点,在于让同类消息发送到同一队列上,rocketmq中是通过队列选择器MessageQueueSelector来实现的

我们查看send方法,需要提供3个参数:消息、队列选择器和一个arg。前两个参数可以理解,那么这个arg是什么呢,其实就是我们用于区分消息类型的标识,比如为了防止上述的混淆,我们要将同一订单的不同状态的消息都发送到同一个队列中,那么我们就可以以订单号作为这个标识,其目的就是将同一类型的消息通过这个标识进行区分。
在这里插入图片描述

2、在发送消息之间,我们需要创建队列,并且将队列指定为顺序队列,即创建队列时指定–order参数为true
(1)我们需要通过namesrv内置的mqadmin工具来实现指定
(2)进入namesrv,在其安装目录的bin目录下执行(如果你和我一样是通过docker安装的rocketmq,那直接进入docker namesrv的容器即可, 进入后的当前目录就是bin目录)

./mqadmin updateTopic -c DefaultCluster -t topic_order -o true -n localhost:9876

在这里插入图片描述
3、其次如果需要保证严格的顺序性,还需要在namesrv中配置orderMessageEnablereturnOrderTopicConfigToBroker 是 true

(注:默认配置文件为namesrv.properties(通过./mqnamesrv -p即可查看配置文件路径),也可通过创建自定义配置文件namesrv.conf, 启动namesrv时指定配置文件nohup sh bin/mqnamesrv -c conf/namesrv.conf &, 本文示例无需配置该项也可保证顺序性,但生产时建议配置)

orderMessageEnable=true
returnOrderTopicConfigToBroker=true

4、整体的顺序发送代码如下,这里我简单使用一个orderId作为区分标识(也叫区分键),将奇数和偶数消息分别视为一种消息,大家实际应用时可以根据自己的业务调整。

其MessageQueueSelector对象的定义,主要是实现其select方法,这里就是通过arg参数,做取余运算,进行队列的选择。当然大家也可以用arg的hashcode来作为标识处理

 public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 声明namesrv地址
        producer.setNamesrvAddr("localhost:9876");
        // 设置重试次数
        producer.setRetryTimesWhenSendFailed(2);
        // 启动实例
        producer.start();

        // 设置消息的topic,tag以及消息体
        Message msg = new Message("topic_test", "tag_test", "消息内容".getBytes(StandardCharsets.UTF_8));

        // 要求发送顺序:i为偶数先发,然后按照由小到大顺序发送
        for (int i = 0; i < 10; i++) {
            // 模拟偶数、奇数分别属于一类消息
            int orderId = i % 2;

            SendResult result = producer.send(msg, new MessageQueueSelector() {
                /**
                 *
                 * @param list 消息队列集合
                 * @param message 消息
                 * @param arg send方法中传入的第三参数,即orderId参数,orderId可以是Object类型
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
                    Integer orderId = (Integer) arg;
                    int index = orderId % list.size();
                    return list.get(index);
                }
            }, orderId);
            System.out.println("发送结果:"+result.toString());
        }
        producer.shutdown();
    }

2.2 顺序消息消费

rocketmq中提供了两种消费处理形式:并发消费(MessageListenerConcurrently)和顺序消费(MessageListenerOrderly

并发消费消费者会创建多个线程同时消费队列消息,而顺序消费流程跟并发消费最大的区别在于,顺序消费对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理

我们在之前消息发送的章节已经提前体验过顺序消费代码实现了,通过上述对监听器类型的描述,我们也能知道顺序消费的实现,就是实现MessageListenerOrderly监听器

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 设置topic
        consumer.subscribe("topic_order", "*");

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
              @Override
              public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                  byte[] body = list.get(0).getBody();
                  System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));
                  return ConsumeOrderlyStatus.SUCCESS;
              }
        });

        // 启动消费者实例
        consumer.start();

        Thread.sleep(10000);
    }

消费到的消息如下所示,可以看到奇偶是分别保持顺序的,即:0,2,4,6,8 和1,3,5,7,9
在这里插入图片描述

3. springboot集成实现顺序消费

针对springboot框架的实现更加简单,因为之前的文章已经描述过了,这里不再累述,大家可以参考之前的文章顺序发送、顺序消费部分。

RocketMQ快速入门:集成springboot实现各类消息发送|异步、同步、顺序、单向、延迟、事务消息(六)附带源码

RocketMQ快速入门:集成spring, springboot实现各类消息消费(七)附带源码

举报

相关推荐

0 条评论