文章目录
1.0 RabbitMQ 初识
RabbitMQ 是基于 Erlang 语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
RabbitMQ 中间件的使用目的,基于消息通知实现异步调用,一般包含三个角色:
1)消息发送者:投递消息的人,就是原来的调用方。
2)消息 Broker:管理、暂存、转发消息,可以理解为容器。
3)消息接收者:接收和处理消息的人,服务提供方。
在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息 Broker,然后接收者根据自己的需求从消息 Broker 那里订阅消息。每当发送方发送消息后,接收者都能接收消息并处理,这样发送消息的人与接收消息的人完全解耦了。
RabbitMQ 对应的框架:
其中包含的几个概念:
1)Publish:生产者,也就是发送消息的一方。
2)consumer:消费者,也就是消费消息的一方。
3)queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理。
4)exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
5)virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的 exchange、queue
1.1 RabbitMQ 安装
1)基于 Docker 来安装 RabbitMQ,使用下面命令即可:
2)如果部署在云服务器中,则还需要添加管理员,命令如下:
可以看到安装命令中有两个映射的端口:
15672:RabbitMQ 提供的管理控制台的端口。
5672:RabbitMQ 的消息发送处理接口。
安装完成后,访问自己的 "IP:15672" 即可看到管理控制台。
如果是首次访问需要登录,默认的用户名和密码就是在设置超级管理员的时候所对应的用户名和密码。
2.0 数据隔离
2.1 用户管理
点击 admin 选项卡,首先看到 RabbitMQ 控制台的用户管理界面:
Name:用户名。
Tages:administrator,说明该用户是超级管理员,拥有所有权限。
Can access virtual host:/,可以访问的 virtual host,这里的 / 是默认的 virtual host 。
添加用户:
xbs 用户已经拥有了超级管理员的权限,但是没有属于自己的虚拟主机。
因此,可以通过添加用户,且创建另一个 virtual host,实现数据隔离。
2.2 virtual host 虚拟主机
先退出原先的用户,切换到刚刚创建的 xbs 用户登录,然后点击 Virtual Host 菜单,进入 virtual hsot 管理页:
创建虚拟主机:
不同的虚拟主机之间,数据是隔离的,相互不会受到影响。简单理解成 MySQL 中的数据库,数据库与数据库之间的数据不会受到影响。
3.0 SpringAMQP
由于 RabbitMQ 采用了 AMQP 协议,因此具备跨语言的特性。任何语言只要遵循 AMQP 协议收发消息,都可以与 RabbitMQ 交互。并且 RabbitMQ 官方也提供了各种不同语言的客户端。但是,RabbitMQ 官方提供的 Java 客户端编码相对复杂,一般生产环境下更多结合 Spring 来使用,而 Spring 的官方刚好基于 RabbitMQ 提供了这样一套消息收发的模板工具:SpringAMQP,并且还基于 SpringBoot 对其实现了自动装配,使用起来非常方便。
SpringAMQP 官方地址:Spring AMQP
SpringAMQP 提供了三个功能:
1)自动声明队列、交换机及其绑定关系。
2)基于注解的监听器模式,异步接收消息。
3)封装了 RabbitTemplate 工具,用于发送消息。
3.1 RabbitMQ 配置
1)依赖引入:
2)application.yml 添加配置:
3.2 发送消息
在发送消息之前,需要先根据图形化界面创建一个 queue1 队列:
成功创建 queue1 队列:
接着在测试类 Test 中编写,并利用 RabbitTemplate 实现消息发送:
代码如下:
测试结果:
可以看到 queue1 队列中接收到消息了:
3.3 接收消息
刚刚发送的消息,已经到 queue1 队列中,但是没有消费者进行消费,所以现在创建消费者进行处理消息。
在 SpringAMQP 中,是使用了监听器来进行对绑定的队列进行监听,将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
代码如下:
注意:需要加上 @Component 注解,成为 IOC 容器中的 Bean 对象。
执行结果:
3.4 WorkQueues 模式
Work queues,任务模型,简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息,如果直接将消息发送到队列中,则队列每一个消息只能被处理一次,每一个消息都不能被多个消费者同时消费。
而在 WorkQueues 模式中,就是将消息直接发送到队列中,且多个消费者绑定同一个队列。
WorkQueues 应用场景:
当消息处理比较耗时时的时候,可能产生消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用 work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
默认情况下,消息是平均分配每个消费者,并没有考虑到消费者的处理能力,没有充分利用每一个消费者的能力,这样显然是有问题的。在 Spring 中有一个简单的配置,可以解决这个问题,通过修改 application.yml 文件,添加配置:
能者多劳,只要完成处理了消息,就能从队列获取一条消息。
测试:
1)定义了两个消费者进行对队列中的消息进行消费:
2)发送多条消息到队列中:
测试结果:
很明显看得出来,消费能力强的消费者处理队列中的消息越多。正所谓能者多劳,这样充分利用每一个消费者的处理能力,可以有效避免消息积压问题。
4.0 交换机类型
在之前没有交换机,都是生产者直接发送消息给队列,而一旦引入交换机,消息发送的模式会有很大变化:
Exchange 交换机,只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失。
而如果将消息直接发送到队列中,那么队列可以将消息进行存储,不会丢失。
交换机的类型有四种:
1)Fanout:广播,将消息交给所有绑定到交换机的队列。
2)Direct:订阅,基于 RoutingKey(路由 key)发送给订阅了消息的队列。
3)Topic:通配符订阅,与 Direct 类型,只不过 RoutingKey 可以使用通配符。
4)Headers:头匹配,基于 MQ 的消息匹配,用的较少。
4.1 Fanout 交换机
在广播模式下,消息发送流程是这样的:
-
1) 可以有多个队列
-
2) 每个队列都要绑定到 Exchange(交换机)
-
3) 生产者发送的消息,只能发送到交换机
-
4) 交换机把消息发送给绑定过的所有队列
-
5) 订阅队列的消费者都能拿到消息
广播模式演示:
先创建 Fanout 类型的交换机:
再创建两个队列 queue1、queue2:
接着 xbs.fanout 交换需要绑定 queue1、queue2 队列:
再接着用代码实现发送消息、接收消息:
代码如下:
1)接收消息:
2)发送消息:
执行结果:
4.2 Direct 交换机
在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 Direct 类型的 Exchange 。
在Direct模型下:
1)队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
2)消息的发送方在 向 Exchange 发送消息时,也必须指定消息的 RoutingKey。
3)Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列的 RoutingKey 与消息的 RoutingKey 完全一致,才会接收到消息。
Direct 交换机演示:
1)首先创建 xbs.direct 交换机:
2)再创建两个队列 queue3、queue4:
3)xbs.direct 交换机绑定两个队列:
4)通过 RoutingKey 路由关键字来指定。对于 queue4 队列也是同样的道理:
接着使用代码来实现发送消息、接收消息:
代码实现:
1)接收消息:
2)发送消息:
执行结果:
当指定 red 的路由关键字,那么只有队列 queue1 才能接收得到,因此对应的消费者才能进行处理。
同理,当指定 blue 的路由关键字,那么只有队列 queue2 才能接收得到,因此对应的消费者才能进行处理。
4.3 Topic 交换机
Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 吧消息路由到不同的队列。只不过 Topic 类型 Exhchange 可以让队列在绑定 BindingKey 的时候使用通配符。
通配符规则:
1)#:匹配一个或多个词。
2)*:只匹配一个词。
Topic 交换机演示:
1)创建 xbs.topic 交换机:
2)创建两个队列 queue5、queue5:
3)将该两个队列进行绑定到 xbs.topic 交换机上,并且指定对应通配符的 RoutingKey 关键字:
4)使用代码实现发送消息、接收消息:
代码如下:
发送消息:
接收消息:
测试结果:
当发送的消息的关键字为 class.student 时,则会将该消息发送到 queue5、queue6 队列中,所以对应的两个消费者都能进行消费:
当发送的消息的关键字为 class.teacher 时,则该消息只会发送到 queue6 队列中:
5.0 声明队列和交换机
在之前都是基于 RabbitMQ 控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建,那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维,在这个过程中是是很容易出错的。
因此推荐的做法是由程序员启动时检查队列和交换机是否存在,如果不存在则自动创建。
5.1 基本的 API
1)SpringAMQP 提供了一个 Queue 类,用来创建队列:
2)SpringAMQP 还提供了一个 Exchange 接口,来表示所有不同类型的交换机:
我们可以自己创建队列和交换机,不过 SpringAMQP 还提供了 ExchangeBuilder 来简化这个过程:
3)而在绑定队列和交换机时,则需要使用 BindingBuilder 来创建 Binding 对象:
5.2 fanout 示例
在 FanoutExchangeCom 中创建一个类,声明两个队列和一个交换机:
当代码运行起来,则会自动创建交换机和队列:
5.3 direct 示例
在 DirectExchangeCom 中创建一个类,声明两个队列和一个交换机:
程序执行结果:
自动创建队列:
自动创建交换机:
5.4 基于注解声明
基于注解 @Bean 的方式声明队列和交换机比价麻烦,Spring 还提供了基于注解方式来声明。在监听队列 @RabbitListener 注解上进行添加相关的注解来声明交换机或者队列。
5.4.1 Fanout 模式的交换机
代码如下:
通过注解 @Queue 声明队列,@Exchange 声明交换机,可以指定类型,默认的类型为 Direct 类型的交换机。最后再通过 @QueueBinding 注解进行绑定队列到交换机中。
当程序运行起来:
tt.fanoutExchange 交换机:
tt.queue5 队列:
//基于注解声明交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "tt.queue6"),
exchange = @Exchange(name = "tt.directExchange",type = ExchangeTypes.DIRECT),
key = "tt.xbs"
))
private void listenMessageByDirect(String massage) throws InterruptedException {
log.info("sendMessageByFanout发送的消息为: " + massage);
}
5.4.2 Direct 模式的交换机
代码如下:
通过 @Queue 注解声明队列,@Exchange 注解声明交换机,且指定交换机类型。还通过 @QueueBinding 注解将队列绑定到交换机中,通过 key 指定路由关键字。
程序执行结果:
tt.directExchange 交换机:
tt.queue6 队列:
6.0 消息转换器
Spring 的消息发送代码接收的消息体是一个 Object:
此外,还需要确保 User 类实现 Serializable 接口,这是因为 SimpleMessageConverter 只支持 String、byte[] 和 Serializable 类型的消息负载。
此时队列中的消息:
在数据传输时,SimpleMessageConverter 会把发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。
只不过,默认情况下 Spring 采用的序列化方式是 JDK 序列化。从所周知,JDK 序列化存在以下问题:
1)数据体积过大。
2)有安全漏洞。
3)可读性差。
6.1 配置 JSON 转化器
显然,JDK 序列化方式并不合适,期望消息体的体积更小、可读性更高,因此可以使用 JSON 方式来做序列化和反序列化。
1)引入依赖:
注意,如果项目中引入了 Spring-boot-start-wed 依赖,则无需再次引入 Jackson 依赖。
2)添加新 MessageConverter 类型
消息转换器中添加的 messageId 可以便于我们将来做幂等性判断。
配置好了 Json 消息转化器之后,进行测试:
发送的消息是 User 类型,且该 User 类不需要再实现 SimpleMessageConverter 接口。
执行结果:
可以很容易看到消息体中的内容,且字节只需要 81 个。
6.2 实现业务幂等性
幂等是一个数学概念,用函数表达式来描述:f(x)=f(f(x)) 。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
实现业务幂等可以通过唯一消息 id,是给每个消息都设置一个唯一 ID,利用 id 区分是否是重复消息:
1)每一条消息都生成一个唯一的 id,与消息一起投递给消费者。
2)消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库。
3)如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
那么通过配置 JSON 转化器中,设置 setCreateMessageIds(true) 方法,接收者都可以在消息中获取到唯一 id 。
代码如下:
1)发送消息:
通过匿名类 MessagePostProcessor 重写 postProcessMessage 方法,将消息发送之前对消息的属性进行修改,比如说配置 id 属性、配置消息头等等,而对于 postProcessMessage 方法不能携带消息本体,只能在 convertAndSend() 方法中属性进行配置,比如说发送 user 实体类。
2)接收消息:
监听 text 队列中的消息,接收的参数不再是 user,而是 Message 类型,通过 massage.getMessageProperties().getMessageId() 获取唯一 id,再通过 MessageConverter 消息转化器将 massage 的本体消息转化为 User 类型。
接收消息的结果: