0
点赞
收藏
分享

微信扫一扫

RabbitMQ 消息确认机制

之前的文章我们已经介绍了 RabbitMQ 的基本使用,但是在默认情况下 RabbitMQ 并不能保证消息是否发送成功、以及是否被成功消费掉。消息在传递过程中存在丢失的可能。基于这样的现状,就有了消息的确认机制,来提高消息传递过程中的可靠性。

RabbitMQ 中,消息的确认机制包含以下两个方面:

  • 消息发送确认,生产者发送消息的确认包含两部分:
    1、生产者发送的消息是否成功到达交换机
    2、消息是否成功的从交换机投放到目标队列
  • 消息接收确认,消费者接收消息有三种不同的确认模式:
    1、AcknowledgeMode.NONE:不确认,这是默认的模式,默认所有消息都被成功消费了,直接从队列删除消息。存在消息被消费过程中由于异常未被成功消费而掉丢失的风险。
    2、AcknowledgeMode.AUTO:自动确认,根据消息被消费过程中是否发生异常来发送确认收到消息拒绝消息的指令到 RabbitMQ 服务。这个确认时机开发人员是不可控的,同样存在消息丢失的风险。
    3、AcknowledgeMode.MANUAL:手动确认,开发人员可以根据实际的业务,在合适的时机手动发送确认收到消息拒绝消息指令到 RabbitMQ 服务,整个过程开发人是可控的。这种模式也是我们要重点介绍的。

一、准备环境

创建 SpringBoot 项目,添加 RabbitMQ 依赖。

这里将生产者和消费者放在一个项目。

application.properties中添加连接 RabbitMQ 服务的配置,以及开启消息确认机制需要的配置:

server.port=8080
# rabbitmq 相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
# 开启消息是否已经发送到交换机的确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消息未成功投递到目标队列时将消息返回
spring.rabbitmq.publisher-returns=true
# 设置消费者需要手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

创建交换机、队列,并完成绑定:

@Configuration
public class AckRabbitMQConfig {
// Fanout交换机
@Bean
FanoutExchange ackExchange() {
return new FanoutExchange("ack.exchange", true, false);
}

// 消息队列
@Bean
Queue ackQueue() {
return new Queue("ack.queue", true);
}

// 绑定队列和交换机
@Bean
Binding ackBinding() {
return BindingBuilder.bind(ackQueue()).to(ackExchange());
}
}

二、消息发送确认

消息发送确认的第一部分,是确认消息是否已经成功发送到交换机,我们需要实现RabbitTemplate.ConfirmCallback接口:

@Service
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* @param correlationData
* @param ack true 表示消息成功发送到交换机,false 则发送失败
* @param cause 消息发送失败的原因
*/

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息已经发送到交换机!");
} else {
System.out.println("消息发送到交换机失败:" + cause);
}
}
}

消息无论是否成功到达交换机都会调用confirm方法。

消息发送确认的第二部分,就是消息是否成功的从交换机投放到目标队列,需要实现RabbitTemplate.ReturnsCallback接口:

@Service
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("未成功投递到队列的消息:"+ returned.toString());
}
}

returnedMessage方法只会在消息未成功投递到目标队列时被调用ReturnedMessage就是投递失败的消息基本信息。

定义好了两种消息发送确认服务,接下来就是配置消息发送确认服务,可以放在 RabbitMQ 配置类里进行全局配置:

@Configuration
public class AckRabbitMQConfig {
@Autowired
RabbitTemplate rabbitTemplate;

@Autowired
ConfirmCallbackService confirmCallbackService;

@Autowired
ReturnCallbackService returnCallbackService;

@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnsCallback(returnCallbackService);
}
......
......
}

也可以在发送消息时单独配置:

@Service
public class SendMessageService {
@Autowired
RabbitTemplate rabbitTemplate;

@Autowired
ConfirmCallbackService confirmCallbackService;

@Autowired
ReturnCallbackService returnCallbackService;

public void send(String message) {
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnsCallback(returnCallbackService);
rabbitTemplate.convertAndSend("ack.exchange", "", message);
System.out.println("生产者发送的消息:" + message);
}
}

三、消息接收确认

消息接收确认的实现就相对简单一些:

@Service
public class ReceiveMessageService {
@RabbitListener(queues = "ack.queue")
public void receive(String msg, Channel channel, Message message) {
try {
// int i = 1/0;
// 确认收到消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("消费者确认收到消息:" + msg);
} catch (Exception e) {
try {
// 拒绝消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
System.out.println("消费者拒绝消息:" + msg);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}

使用消息接收的手动确认模式时,接收消息的方法需要额外添加ChannelMessage两个类型的参数。

Channel就是信道,在学习 Java client 操作 RabbitMQ 时,就是用它来发送接收消息的,不了解的可以复习一下。Message是 RabbitMQ 封装的消息类,里边包含了消息体、消息序号、以及交换机、队列等一些相关的信息。

这样我们就可以根据实际的业务需求,在适当的时机告诉 RabbitMQ 服务,消息已经成功消费,或者被拒绝消费。

这就涉及如下几个方法了:

  • basicAck,确认收到消息,即消息消费成功,执行该方法后,消息会被从队列删除。该方法的参数含义如下:
    1、deliveryTag:消息投递的序号,就是1、2、3、4这样的递增整数。
    2、multiple:是否批量确认消息,false 表示只确认当前 deliveryTag 对应的消息,true 表示会确认小于当前 deliveryTag 但还未被确认的消息。
  • basicNack,拒绝消息,由于发生异常等原因,消息没有被成功消费。和 basicAck 方法相比多了一个参数:
    1、requeue:true 表示被拒绝的消息会重新进入队列头部。
  • basicReject,和 basicNack 方法的作用类似,但是少了 multiple 参数。

这里有两个问题需要注意:

1、

如果拒绝消息时,设置requeuetrue,由于消息会重新进入队列头部,接下来又会被消费者处理,这样很可能陷入死循环,耗尽服务器资源,很危险的。所以在设置requeuetrue时,需要慎重考虑。

拒绝消息时一般都是由于发生异常、或者业务上的错误,导致消费流程不能正常进行下去,可以考虑将此时的消息发送到死信队列,后续再单独处理。具体怎么实现,后期会有专门的文章介绍,目前先了解即可。

2、

如果开启了消息接收的手动确认模式,但是消费消息时却没有做任何消息确认成功或拒绝的应答操作,则对应的消息会变成Unacked状态:

如果消费者客户端不重启,则Unacked状态的消息会一直堆积,不会被删除,也不会被重新消费。

如果消费者客户端重启,则消息会自动变为Ready状态,这样又会被重新消费一次。

三、效果测试

可以通过如下接口来发送消息:

@RestController
public class SendMessageController {
@Autowired
private SendMessageService sendMessageService;

@GetMapping("/send/{msg}")
public void send(@PathVariable("msg") String msg) {
sendMessageService.send(msg);
}
}

要测试消息不能成功发送到交换机的情况,只需要发送消息时指定一个不存在的交换机即可。

由于RabbitTemplate.ReturnsCallbackreturnedMessage方法只会在消息未成功投递到目标队列时被调用,所以要测试消息是否成功的从交换机投放到目标队列,可以注释掉AckRabbitMQConfig中交换机和队列绑定的代码,或者在后台进行交换机和队列的解绑:

这样消息自然不能成功的从交换机投放到队列。

至于消息接收确认,可以自行模拟不同的业务场景测试。

本文完!

举报

相关推荐

0 条评论