文章目录
1. RabbitMQ 工作原理

解释说明:
- 生产者(
Producer):发布消息到RabbitMQ中的交换机(Exchange)上 - 交换机(
Exchange):和生产者建立连接并接收生产者的消息,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定 - 队列(
Queue):Exchange将消息分发到指定的Queue,Queue和消费者进行交互。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据 - 路由(
Routes):交换机转发消息到队列的规则 - 消费者(
Consumer):监听RabbitMQ中的Queue中的消息
2. RabbitMQ 七种消息收发方式
RabbitMQ官网介绍了如下七种消息分发的形式,下面逐一代码实现。代码可在文章最后查看。
2.1 代码环境
SpringBoot : 2.5.7、RabbitMQ:3.9.13
application.properties 中配置 RabbitMQ的基本连接信息,如下:
server.port=8888
spring.rabbitmq.host=192.168.3.157
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
在 RabbitMQ中,所有的消息生产者提交的消息都会交由 Exchange进行再分配,Exchange会根据不同的策略将消息分发到不同的 Queue中。
2.2 消息收发
2.2.1 Hello World
消息传播图如下:

这种消息分发采用的是默认的Exchange,在RabbitMQ的Web客户端中,可以看到RabbitMQ提供的交换机

定义队列:
@Configuration
public class RabbitMQConfig {
// 队列的名称
public static final String SCORPIOS_QUEUE_NAME = "scorpios_queue_name";
/**
* 第一个参数是消息队列名字
* 第二个参数表示消息是否持久化
* 第三个参数表示消息队列是否排他,一般都是设置为 false,即不排他
* 第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列
* @return
*/
@Bean
Queue queue() {
return new Queue(RabbitMQConfig.SCORPIOS_QUEUE_NAME,true,false,false);
}
}
消费者定义:
@Slf4j
@Component
public class ScorpiosConsumer {
@RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME)
public void consume(String msg) {
log.info("消费者收到的消息为:{}",msg);
}
}
消息发送:模拟发送请求来向RabbiMQ发送消息
@RestController
public class RabbitMQController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String test(){
log.info("接收到客户端消息,向RabbitMQ发送消息...");
rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_QUEUE_NAME, "hello scorpios....");
log.info("-----------------------------------");
return "success";
}
}
测试结果:浏览器输入:http://localhost:8888/send/message,看控制台日志输入

在上面的代码中,并没有创建Exchange,而是使用默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange上,当一条消息到达 DirectExchange时会被转发到与该条消息 routing key 相同的 Queue上,例如消息队列名为 scorpios_queue_name,则 routingkey为 scorpios_queue_name”的消息会被该消息队列接收。
正如官网介绍这种方式的一样:The simplest thing that does something,这种方式最为简单
2.2.2 Work queues
一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:

一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。
消费者也可以配置各自的并发能力,进而提高消息的消费能力,消费者也可以配置手动 ack,来决定是否要消费某一条消息
先看并发能力的配置,如下:
@Slf4j
@Component
public class ScorpiosConsumer {
@RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME)
public void consumeOne(String msg) {
log.info("consumeOne消费者收到的消息为:{}",msg);
}
// 表示此消费者会创建5个线程来执行
@RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME,concurrency = "5")
public void consumeTwo(String msg) {
log.info("consumeTwo消费者收到的消息为:{}",msg);
}
}
第二个消费者我配置了 concurrency为 5,此时,对于第二个消费者,将会同时存在5 个子线程去消费消息
启动项目,在 RabbitMQ后台也可以看到一共有 6个消费者。一个连接,具有6个Channel

此时,如果生产者发送 5条消息,就会一下都被消费掉
消息发送方式如下:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String test(){
log.info("接收到客户端消息,向RabbitMQ发送消息...");
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_QUEUE_NAME, "hello scorpios...." + i);
}
log.info("-----------------------------------");
return "success";
}
}
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

可以看到,消息都被第二个消费者消费了。但需要注意,多试几次可以看到,消息也有可能被第一个消费者消费
下面来看一下,消费者开启手动 ack,这样可以自行决定是否消费 RabbitMQ发来的消息,配置手动 ack需要在配置文件中添加如下配置:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费代码如下:
@Slf4j
@Component
public class ScorpiosConsumer {
@RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME)
public void consumeOne(Message message, Channel channel) throws IOException {
log.info("consumeOne消费者收到的消息为:{}",message.getPayload());
// 确认消费消息
channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
}
@RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME,concurrency = "5")
public void consumeTwo(Message message, Channel channel) throws IOException {
log.info("consumeTwo消费者收到的消息为:{},消费线程为:{}", message.getPayload(), Thread.currentThread().getName());
// 拒绝消费消息
channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);
}
}
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息
2.2.3 Publish/Subscribe
一个生产者(Producer),一个交换机(Exchange),多个消费者(Consumer),每一个消费者都有自己的一个队列(Queue)
生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的
需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ中 Exchange不具备存储消息的能力,只有队列具备存储消息的能力,如下图:

这里交换机有四种选择,分别是:
Direct(直接)Fanout(扇出)Topic(主题)Header(标题)
2.2.3.1 Direct
DirectExchange的路由策略是将消息队列(Queue)绑定到一个 DirectExchange上,当一条消息到达 DirectExchange时会被转发到与该条消息 routing key 相同的 Queue上。
配置类:
@Configuration
public class RabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";
// 队列的名称
public static final String SCORPIOS_QUEUE_NAME = "scorpios_queue_name";
/**
* 创建一个DirectExchange交换机
* 第一个参数:交换机名字
* 第二个参数:重启后是否依然有效
* 第三个参数:长期未用时是否删除
* @return
*/
@Bean
DirectExchange directExchange(){
return new DirectExchange(RabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
}
@Bean
Queue queue() {
return new Queue(RabbitMQConfig.SCORPIOS_QUEUE_NAME,true,false,false);
}
// 将队列与DirectExchange绑定,要指定routingkey
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
}
}
Binding其实是Exchange和Queue之间的桥梁,它告诉我们Exchange和哪个Queue进行了绑定关系
消费者:
@Slf4j
@Component
public class DirectConsumer {
@RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME)
public void consume(String msg) {
log.info("consume消费者收到的消息为:{}",msg);
}
}
发送者:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String test(){
log.info("接收到客户端消息");
// 要添加routingkey参数
rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"direct","hello scorpios...");
return "success";
}
}
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

结合下面这张图再次理解下,交换机的类型为DirectExchange:
- 只有
routingkey为orange时,消息才会转到Q1队列 routingkey为black、green时,消息才会转到Q2队列

2.2.3.2 Fanout
FanoutExchange的数据交换策略是把所有到达 FanoutExchange的消息转发给所有与它绑定的 Queue上,在这种策略中,routingkey将不起任何作用,FanoutExchange配置方式如下:
@Configuration
public class FanoutRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";
// 队列的名称
public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_one";
public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_two";
/**
* 创建一个FanoutExchange交换机
* 第一个参数:交换机名字
* 第二个参数:重启后是否依然有效
* 第三个参数:长期未用时是否删除
* @return
*/
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange(FanoutRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
}
@Bean
Queue queueOne() {
return new Queue(FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false);
}
@Bean
Queue queueTwo() {
return new Queue(FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false);
}
// 将队列与FanoutExchange绑定
@Bean
Binding bindingOne() {
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
@Bean
Binding bindingTwo() {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}
上面创建 FanoutExchange,参数含义与创建 DirectExchange参数含义一致,然后创建两个 Queue,再将这两个 Queue都绑定到 FanoutExchange上。接下来创建两个消费者,如下:
@Slf4j
@Component
public class FanoutConsumer {
@RabbitListener(queues = FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE)
public void consumeOne(String msg) {
log.info("consumeOne消费者收到的消息为:{}",msg);
}
@RabbitListener(queues = FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO)
public void consumeTwo(String msg) {
log.info("consumeTwo消费者收到的消息为:{}",msg);
}
}
发送者:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String test(){
log.info("接收到客户端消息");
// routingkey 参数为null
rabbitTemplate.convertAndSend(FanoutRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,"hello scorpios... FanoutExchange");
return "success";
}}
}
注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null。
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

来看下面这张图:

如果Exchange的绑定类型是Direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是Direct,但是它表现的就和Fanout有点类似了,就跟广播差不多,如上图所示。
2.2.3.3 Topic
TopicExchange是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange中,Queue通过 routingkey绑定到 TopicExchange上,当消息到达 TopicExchange后,TopicExchange 根据消息的routingkey 将消息路由到一个或者多个Queue `上。

TopicExchange配置如下:
@Configuration
public class TopicsRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";
// 队列的名称
public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_xiaomi";
public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_huawei";
public static final String SCORPIOS_QUEUE_NAME_THREE = "scorpios_queue_name_phone";
/**
* 创建一个TopicExchange交换机
* 第一个参数:交换机名字
* 第二个参数:重启后是否依然有效
* 第三个参数:长期未用时是否删除
* @return
*/
@Bean
TopicExchange topicExchange(){
return new TopicExchange(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
}
@Bean
Queue xiaomi() {
return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false);
}
@Bean
Queue huawei() {
return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false);
}
@Bean
Queue phone() {
return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_THREE,true,false,false);
}
// 将队列与TopicExchange绑定
@Bean
Binding bindingXiaomi() {
return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
}
@Bean
Binding bindingHuawei() {
return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
}
@Bean
Binding bindingPhone() {
return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
}
}
- 创建 一个
TopicExchange,三个Queue,第一个Queue用来存储和xiaomi有关的消息,第二个Queue用来存储和huawei有关的消息,第三个Queue用来存储和phone有关的消息 - 将三个
Queue分别绑定到TopicExchange上,- 第一个
Binding中的xiaomi.#表示消息的routingkey凡是以xiaomi开头的,都将被路由到名称为xiaomi的Queue上 - 第二个
Binding中的huawei.#表示消息的routingkey凡是以huawei开头的,都将被路由到名称为huawei的Queue上 - 第三个
Binding中的#.phone.#则表示消息的routingkey中凡是包含phone的,都将被路由到名称为phone的Queue上
- 第一个
接下来针对三个 Queue创建三个消费者,如下:
@Slf4j
@Component
public class TopicsConsumer {
@RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE)
public void consumeXiaomi(String msg) {
log.info("consumeXiaomi消费者收到的消息为:{},匹配routingkey:xiaomi.#",msg);
}
@RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO)
public void consumeHuawei(String msg) {
log.info("consumeHuawei消费者收到的消息为:{},匹配routingkey:huawei.#",msg);
}
@RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_THREE)
public void consumePhone(String msg) {
log.info("consumePhone消费者收到的消息为:{},匹配routingkey:#.phone.#",msg);
}
}
发送者:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String test(){
log.info("接收到客户端消息");
rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"xiaomi.news","小米新闻,xiao.news");
rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"huawei.news","华为新闻,huawei.news");
rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"xiaomi.phone","小米手机,xiaomi.phone");
rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"huawei.phone","华为手机,huawei.phone");
rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"phone.news","手机新闻,phone.news");
return "success";
}
}
根据 TopicsRabbitMQConfig中的配置,测试结果应该如下:
- 第一条消息将被路由到名称为
xiaomi的Queue上 - 第二条消息将被路由到名为
huawei的Queue上 - 第三条消息将被路由到名为
xiaomi以及名为phone的Queue上 - 第四条消息将被路由到名为
huawei以及名为phone的Queue上 - 第五条消息则将被路由到名为
phone的Queue上
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

2.2.3.4 Header
HeadersExchange是一种使用较少的路由策略,HeadersExchange会根据消息的 Header将消息路由到不同的 Queue上,这种策略也和 routingkey无关,HeadersExchange配置如下:
@Configuration
public class HeaderRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";
// 队列的名称
public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_name";
public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_age";
/**
* 创建一个HeadersExchange交换机
* 第一个参数:交换机名字
* 第二个参数:重启后是否依然有效
* 第三个参数:长期未用时是否删除
* @return
*/
@Bean
HeadersExchange headersExchange(){
return new HeadersExchange(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
}
@Bean
Queue queueName() {
return new Queue(HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false);
}
@Bean
Queue queueAge() {
return new Queue(HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false);
}
// 将队列与HeadersExchange绑定
@Bean
Binding bindingName() {
Map<String, Object> map = new HashMap<>();
map.put("name", "scorpios");
return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
}
@Bean
Binding bindingAge() {
return BindingBuilder.bind(queueAge()).to(headersExchange()).whereAny("age").exist();
}
}
这里主要关注下 Binding的配置上,第一个 bindingName方法中,whereAny表示消息的 Header中只要有一个 Header匹配上 map中的 key/value,就把该消息路由到名为 scorpios_queue_name_name的 Queue上,这里也可以使用 whereAll方法,表示消息的所有 Header都要匹配。whereAny和 whereAll实际上对应了一个名为 x-match 的属性。bindingAge中的配置则表示只要消息的 Header中包含 age,不管 age的值是多少,都将消息路由到名为 scorpios_queue_name_age的 Queue上
消费者:
@Slf4j
@Component
public class HeaderConsumer {
@RabbitListener(queues = HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE)
public void consumeName(String msg) {
log.info("consumeName消费者收到的消息为:{}",msg);
}
@RabbitListener(queues = HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO)
public void consumeAge(String msg) {
log.info("consumeAge消费者收到的消息为:{}",msg);
}
}
发送者:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String test(){
log.info("接收到客户端消息");
Message name = MessageBuilder.withBody("header exchange, scorpios_queue_name_name".getBytes())
.setHeader("name", "scorpios").build();
Message age = MessageBuilder.withBody("header exchange, scorpios_queue_name_age".getBytes())
.setHeader("age", "20").build();
rabbitTemplate.convertAndSend(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,name);
rabbitTemplate.convertAndSend(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,age);
return "success";
}
}
创建并发送两条消息,两条消息具有不同的 header,不同 header的消息将被发到不同的 Queue中去
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

2.2.3.5 小结
DirectExchange、TopicExchange都需要用到routingkeyFanoutExchange、HeadersExchange不需要routingkey- DirectExchange类型交换机,如果它绑定的多个队列的
key如果都相同,虽然绑定类型是Direct,但是它表现和FanoutExchange有点类似,和广播差不多
2.2.4 Routing
一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange后,根据 RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey即可,看下图理解下:

这种方式就是按照 routing key 去路由消息,可以参考上面DirectExchange、TopicExchange使用
2.2.5 Topics
一个生产者,一个交换机,两个队列,两个消费者,生产者创建 TopicExchange并且绑定到队列中,这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写使用通配符,看下图理解下:

这种方式就是按照 routing key 去路由消息,可以参考上面TopicExchange使用
2.2.6 RPC
RabbitMQ提供了RPC功能,原理图如下:

原理解释:
Client发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是correlation_id,这个表示这条消息的唯一id,还有一个内容是reply_to,这个表示消息回复队列的名字Server从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到reply_to指定的回调队列中Client从回调队列中读取消息,就可以知道消息的执行结果
具体示例,下一篇实现~
2.2.7 Publisher Confirms
在解决消息可靠性的问题时,有两种方式:事务和消息确认。
对于消息是否被成功消费,可以使用这种方式——消息确认机制。消息确认分为:自动确认和手动确认。
在上面的代码中,大部分都使用了自动确认。除了在介绍Work Queues方式时,消费者开启了手动 ack
这种方式很重要,后续单独研究吧~~










