0
点赞
收藏
分享

微信扫一扫

SpringBoot-RabbitMQ消息的消费与签收机制

消息的签收机制说明

消息消费成功后,我们在客户端签收后,消息就从MQ服务器里面删除了若消息没有消费成功,我们让他回到MQ里面,让别人再次重试消费。

自动签收

消息只要被客户端接收到,无论你客户端发生了什么,我们服务器都不管你了,直接把消息删除了,这是它是默认的行为。

手动签收

创建项目 springboot-rabbitmq,创建方式和之前的方式一样依赖也是。

修改application.yml配置文件:

server:
port: 8080
spring:
application:
name: Springboot-RabbitMQ
rabbitmq:
username: user
password: 123456
host: 139.196.183.130
port: 5672
virtual-host: v-it6666
# NONE 值是禁用发布确认模式,是默认值
# CORRELATED 值是发布消息成功到交换机后会触发回调方法
publisher-confirm-type: correlated
# 这个是老版本的用法
# publisher-confirms: true
# 消息由交换机到达队列失败时触发
publisher-returns: true
listener:
simple:
# 自动签收,这个是默认行为
# acknowledge-mode: auto
# 手动签收
acknowledge-mode: manual
direct:
# 设置直连交换机的签收类型
acknowledge-mode: manual

消息投递的 ID 说明

SpringBoot-RabbitMQ消息的消费与签收机制_RabbitMQ

获取投递的 ID

SpringBoot-RabbitMQ消息的消费与签收机制_spring_02

/**
* @author BNTang
*/
@Component
public class MessageReceive {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
key = {"error"},
exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT)
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
// 消息投递ID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// messageId 就是消息的唯一的标识,自己定义
String messageId = message.getMessageProperties().getMessageId();

System.out.println("消费者收到消息 → 消息对象:" + message);
System.out.println("消费者收到消息 → 内容为:" + content);
System.out.println("消费者收到消息 → 信道:" + channel);
System.out.println("消息投递ID → :" + deliveryTag);
System.out.println("消息自定义ID → :" + messageId);

channel.basicAck(deliveryTag, false);
}
}

basicAck方法参数的解释如下:

  • deliveryTag:消息投递ID,要签收的投递ID。
  • multiple:是否批量签收。

投递 ID 存在的问题及消息永久 ID 设置的问题

什么能代表消息的唯一的标识,显然投送的 ID 不行,因为一个消息可能会有多个投送的 ID,我们就需要给消息一个唯一的值,这个伴随消息终身,不会变化!我们需要发送消息时,给消息设置一个 ID,然后保证该 ID 唯一就可以了,如下所示!

@Test
void sendMsg() throws IOException {
for (int i = 0; i < 5; i++) {
this.rabbitTemplate.convertAndSend("directs", "error", "我是一个测试消息" + i,
message -> {
String messageId = UUID.randomUUID().toString().replace("-", "");
// 自己给消息设置自定义的ID
message.getMessageProperties().setMessageId(messageId);
return message;
});
System.out.println("消息发送成功");
System.in.read();
}
}

关于批量签收消息

若我们此时签收了编号为4的消息,但是前面的0,1,2,3 都没有签收,则MQ若是批量的签收,它会把0,1,2,3 都签收,因为MQ认为,比他晚投递的已经签收,前面的肯定已经消费成功了。

生产者

static int a = 1;

@Test
public void sendMessage() throws IOException {
for (int i = 0; i <= 3; i++) {
this.rabbitTemplate.convertAndSend("directs", "error", "ABC - " + i, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 自己给消息设置自定义的ID
message.getMessageProperties().setMessageId((a++) + "");
return message;
}
});
}
System.out.println("消息发送成功");
System.in.read();
}

消费者

/**
* @author BNTang
*/
@Component
public class MessageReceive {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("queue"),
key = {"error"},
exchange = @Exchange(value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();

System.out.println("消息投递ID → :" + deliveryTag);
System.out.println("消息自定义ID → :" + messageId);

if (content.equals("ABC - 3")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息签收成功 → 内容为:" + content);
}
}
}

SpringBoot-RabbitMQ消息的消费与签收机制_SpringBoot_03

可以发现只签收了ABC - 3 但是队列里面没有消息了,说明前面的12都被批量签收了。

不签收

当我们认为消息不合格时,或不是我们要的消息时,我们可以选择不签收它。

生产者

@Test
public void sendMessage() throws IOException {
this.rabbitTemplate.convertAndSend("directs", "error", "1234567", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String messageId = UUID.randomUUID().toString().replace("-", "");
// 自己给消息设置自定义的ID
message.getMessageProperties().setMessageId(messageId);
return message;
}
});
System.out.println("消息发送成功");
System.in.read();
}

消费者

/**
* @author BNTang
*/
@Component
public class MessageReceive {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("queue"),
key = {"error"},
exchange = @Exchange(value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();

System.out.println("消息投递ID → :" + deliveryTag);
System.out.println("消息自定义ID → :" + messageId);

if (content.equals("1234567")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息签收成功");
} else {
// 如果不是 1234567 就决绝签收
channel.basicNack(deliveryTag, false, true);
System.out.println("消息被决绝签收");
}
}
}

如上的代码测试方式你先发送一个消息,消息内容为 1234567 这是正常的情况,然后在发送一个 123456 就会发现效果,消息消费死循环了。

我们选择不签收,其实是为了保护消息,当消费消息发生异常时,我们可以把消息放在队列里面,让它重新投递,重新让别人消费!而不是丢了它!

解决不签收消息的死循环

不签收,并且让它回到队列里面,想法很好,但是很容易造成死循环,因为没有任何人能消费她! 我们设计一个机制,当一个消息被消费3次还没有消费成功,我们就直接把它记录下来,人工处理! 消息消费3次(消息的标识,消息的计数)我们引入Redis,使用Redis计数,若超过3次,直接拒绝消息,并且不回到队列里面。

引入 Redis 依赖,并使用 Docker 运行 Redis,Redis 依赖如下:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Docker 运行 Redis 命令脚本如下所示,当然也可以使用本地的Redis图方便,我这里是有机子我就用我的机子了:

docker run -d --name myredis -p 6390:6379 redis --requirepass "1234"

修改消费者的配置文件

SpringBoot-RabbitMQ消息的消费与签收机制_RabbitMQ_04

server:
port: 8002
spring:
application:
name: consumer
rabbitmq:
host: 139.196.183.130
port: 5672
username: user
password: 123456
virtual-host: v-it6666
# Redis的配置
redis:
host: 139.196.183.130
port: 6390
password: 1234

改造消费者,改造之后的代码如下:

/**
* @author BNTang
*/
@Component
public class MessageReceive {

@Autowired
private StringRedisTemplate redisTemplate;

/**
* 消息的前缀
*/
private String MESSAGE = "message:";

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("queue"),
key = {"error"},
exchange = @Exchange(value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();

System.out.println("消息投递ID → :" + deliveryTag);
System.out.println("消息自定义ID → :" + messageId);

if (content.equals("1234567")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息签收成功");
} else {
String count = this.redisTemplate.opsForValue().get(MESSAGE + messageId);

if (count != null && Long.valueOf(count) >= 3) {
channel.basicNack(deliveryTag, false, false);
System.out.println("该消息消费【3】次都失败,我们记录它,人工处理" + content);
} else {
// 如果不是 1234567 就决绝签收
// 处理业务逻辑【可能逻辑处理的出现了问题啥的】
channel.basicNack(deliveryTag, false, true);
System.out.println("消息被决绝签收");

// 因为拒绝了,我们把消息ID放到Redis里面
this.redisTemplate.opsForValue().increment(MESSAGE + messageId);
}
}
}
}

如上basicNack方法参数的解释如下所示:

  • deliveryTag:消息的投递ID,要签收的投递ID是多少
  • multiple:是否批量签收
  • requeue:true,代表决绝签收,并把消息重新放回队列里面,false,直接拒绝签收

测试注意,因为统计计数时,消息的次数,是通过消息的 ID 来计数的,我们在发送消息时,要设置消息的头:

SpringBoot-RabbitMQ消息的消费与签收机制_redis_05




举报

相关推荐

0 条评论