0
点赞
收藏
分享

微信扫一扫

Debug方式讲解Rabbitmq的自动ACK和手动ACK

晚熟的猫 2022-04-29 阅读 81
java后端

文章首发于我的个人博客,到个人博客体验更佳阅读哦

介绍Rabbitmq的手动ACK和自动ACK

当消息一旦被消费者接收,队列中的消息就会被删除。那么问题来了:RabbitMQ怎么知道消息被接收了呢?

这就要通过消息确认机制(Acknowlege)来实现了。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  • 自动ACK:消息一旦被接收,消费者自动发送ACK
  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

这两ACK要怎么选择呢?这需要看消息的重要性:

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

自动ACK

自动ACK的演示流程

Pom.xml代码如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.rabbitmq</groupId>
<artifactId>itcast-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>

首先工具类代码如下

package cn.itcast.rabbitmq.util;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/

public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址,设置自己的服务器密码
factory.setHost("******");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/***");
factory.setUsername("***");
factory.setPassword("***");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}

}

然后是生产消息方代码

下面的代码我在debug之后,当下面的蓝色代码执行完毕之后,在rabbitmq里面的Connections里面就有下面的展示,这就表示发送方的链接和rabbitmq已经连上了

然后当执行完下面的红色代码的时候,在rabbitmq里面的Channels里面就有下面的展示,这就表示发送方的通道创建好了

然后下面的黄色代码执行完毕之后,在rabbitmq的队列里面就有Hello World!这个消息了,此时我们可以看到这个消息还没有被消费

然后我们点击上面的红框里面的东西,然后在点击下面的东西,就可以看到我们刚刚发送的消息

因为有下面的绿色代码,所以发送方在执行之后,rabbitmq里面的Connections的发送链接就关闭了,rabbitmq里面的Channels的通道链接就关闭了,所以此时rabbitmq里面的Connections是下面这样

此时rabbitmq里面的Channels是下面这样

package cn.itcast.rabbitmq.simple;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/**
* 生产者
*/

public class Send {

private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {
//蓝色代码注释开始
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//蓝色代码注释结束
//红色代码注释开始
// 从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
//红色代码注释结束
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//黄色代码注释开始
// 消息内容
String message = "Hello World!";
// 向指定的队列中发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//黄色代码注释结束
//绿色代码注释开始
//关闭通道和连接
channel.close();
connection.close();
//绿色代码注释结束
}
}

然后是接受方接受消息的代码

Pom.xml代码如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.rabbitmq</groupId>
<artifactId>itcast-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>

下面的当下面的蓝色代码执行完毕之后,在rabbitmq里面的Connections里面就有下面的展示,这就表示接受方的链接和rabbitmq已经连上了

然后当执行完下面的红色代码的时候,在rabbitma里面的Channels里面就有下面的展示,这就表示接受方的通道创建好了

下面的绿色代码表示创建一个消费对象,这个消费者并且有一个监听事件,如果有消息的时候,会被自动调用,下面的黄色代码表示把队列和消费者绑定到一起,当执行完毕之后,队列里面的消息就会没有了

此时要注意当接受消息之后,rabbitmq里面的queues里面的消息就会被删除了

注意因为下面的接受方没有连接和通道的关闭,所以此时的接收方的Connections连接和channels通道连接还是一直没有关闭的
你可以看到这里的程序一直没有停

然后接收方的Connections连接也没有消失

然后此时Idea接收方的channels连接也没有消失,
然后我们手动关闭这个接受方的程序,就是点击这个红色按钮

然后rabbitmq里面的Connections的发送链接就关闭了,rabbitmq里面的Channels的通道链接就关闭了,所以此时rabbitmq里面的Channels是下面这样

package cn.itcast.rabbitmq.simple;

import java.io.IOException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import cn.itcast.rabbitmq.util.ConnectionUtil;

/**
* 消费者
*/

public class Recv {
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {
// 蓝色代码开始
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 蓝色代码结束
// 红色代码开始
// 创建通道
Channel channel = connection.createChannel();
// 红色代码结束
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绿色代码开始
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body)
throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
}
};
// 绿色代码结束
// 黄色代码开始
// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME, true, consumer);
// 黄色代码结束
}
}
自动ACK的缺点

工具类代码如下所示

package cn.itcast.rabbitmq.util;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/

public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("47.91.248.236");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/leyou");
factory.setUsername("leyou");
factory.setPassword("leyou");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}

}

比如下面的代码
首先pom.xml代码如下所示

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.rabbitmq</groupId>
<artifactId>itcast-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>

然后发送方的代码如下所示

package cn.itcast.rabbitmq.simple;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/**
* 生产者
*/

public class Send {

private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 向指定的队列中发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

//关闭通道和连接
channel.close();
connection.close();
}
}

然后执行发送方的代码,然后消息队列里面就会有这个消息了

然后接受方代码如下所示,注意下面的绿色代码抛出异常了,然后此时我们执行接收方的代码,下面的蓝色代码表示自动进行ack

package cn.itcast.rabbitmq.simple;

import java.io.IOException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import cn.itcast.rabbitmq.util.ConnectionUtil;

/**
* 消费者
*/

public class Recv {
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body)
throws IOException {
// body 即消息体
String msg = new String(body);
//绿色代码开始
int i=1/0;
//绿色代码结束
//红色代码开始
System.out.println(" [x] received : " + msg + "!");
//红色代码结束
}
};
//绿色代码开始
// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME, true, consumer);
//绿色代码结束

}
}

执行接收方代码之后,结果如下所示,此时可以看到根本没有执行上面的红色代码,也就是说此时不算接受成功(此时如果红色代码是重要的逻辑代码,那么在实际开发里面不就有问题了)

但是此时可以看到rabbitmq里面的消息队列里面的消息已经没有了,这就是自动ack的缺点

手动ack解决自动ack的缺点

工具类代码如下所示

package cn.itcast.rabbitmq.util;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/

public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("47.91.248.236");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/leyou");
factory.setUsername("leyou");
factory.setPassword("leyou");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}

}

比如下面的代码
首先pom.xml代码如下所示

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.rabbitmq</groupId>
<artifactId>itcast-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>

然后发送方的代码如下所示

package cn.itcast.rabbitmq.simple;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/**
* 生产者
*/

public class Send {

private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 向指定的队列中发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

//关闭通道和连接
channel.close();
connection.close();
}
}

然后执行发送方的代码,然后消息队列里面就会有这个消息了

然后接受方代码如下所示,注意下面的绿色代码抛出异常了,然后此时我们执行接收方的代码,此时红色代码就表示手动进行ack

package cn.itcast.rabbitmq.simple;

import java.io.IOException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import cn.itcast.rabbitmq.util.ConnectionUtil;

/**
* 消费者,手动进行ACK
*/

public class Recv2 {
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body)
throws IOException {
// body 即消息体
String msg = new String(body);
//绿色代码开始
int i=1/0;
//绿色代码结束
//红色代码开始
System.out.println(" [x] received : " + msg + "!");
// 手动进行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
//红色代码结束
}
};
//红色代码开始
// 监听队列,第二个参数false,手动进行ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
//红色代码开始
}
}

执行接收方代码之后,结果如下所示,此时可以看到根本没有执行上面的红色代码,也就是说此时不算接受成功

但是此时可以看到rabbitmq里面的消息队列里面的消息还是有的,这就是手动ack解决了自动ack的缺点

举报

相关推荐

0 条评论