目录
RabbitMQ 共提供了7种⼯作模式供我们进⾏消息传递,接下来一一介绍它的实现与目的
1.简单模式队列
生产者:
public static void main(String[] args) throws IOException,TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 1.队列名称
* 2.durable 可持久化 true为持久化
* 3.exclusive 是否独占 false
* 4. autoDelete 是否自动删除 false
* arguments 参数
*/
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare("simple",true,false,false,null);
//5 通过channel发送到queue
/**
* basicPublish(String exchange, String routingKey, AMQP.BasicProperties props,
* byte[] body)
* 1.exchange 交换机名称 ,简单情况下 一般默认的情况为""
* 2.routingKey 路由名称=队列名称
* 3.props 配置信息
* 4.body 发现信息的数据
*/
for (int i = 0; i < 10; i++) {
String msg = "hello 简单队列~"+i;
channel.basicPublish("","simple", null, msg.getBytes());
}
System.out.println("信息发送成功!");
//6. 资源释放
channel.close();
connection.close();
}
消费者:
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 1.队列名称
* 2.durable 可持久化 true为持久化
* 3.exclusive 是否独占 false
* 4. autoDelete 是否自动删除 false
* arguments 参数
*/
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare("simple",true,false,false,null);
//5.接收信息 并消费
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue :队列名称
* autoAck:是否自动确认 消费者接收信息与MQ
* callback :回调对象
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
回调方法 当收到信息 自动执行该方法
consumerTag
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到信息:"+new String(body));
}
};
channel.basicConsume("simple",true,consumer);
// 释放资源
channel.close();
connection.close();
}
2.WorkQueue(⼯作队列)
生产者:
跟简单模式类似 或一个队列名称
public static void main(String[] args) throws IOException,TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
//5 通过channel发送到queue
for (int i = 0; i < 10; i++) {
String msg = "hello 工作队列~"+i;
channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());
}
System.out.println("信息发送成功!");
//6. 资源释放
channel.close();
connection.close();
}
消费者1:
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
//5.接收信息 并消费
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到信息:"+new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE,true,consumer);
// 释放资源
channel.close();
connection.close();
}
消费者2 也是同样的代码
3 Publish/Subscribe(发布/订阅)
Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
生产者:
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
//4. 声明队列
channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
//5. 交换机和队列绑定
channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
//6. 发布消息
for (int i = 0; i < 10; i++) {
String msg = "hello 发布订阅队列~"+i;
channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
}
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
消费者1
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
}
消费者2同理
4 Routing(路由模式)
生产者:
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
//4. 声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
//5. 交换机和队列绑定
channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");
//6. 发布消息
String msg_a = "hello 路由队列~ my routingKey is a...";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg_a.getBytes());
String msg_b = "hello 路由队列~ my routingKey is b...";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());
String msg_c = "hello 路由队列~ my routingKey is c...";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
消费者1
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
//5.接收信息 并消费
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到信息:"+new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);
}
消费者 2同理
5.Topics(通配符模式)
生产者:
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
//4. 声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
//5. 交换机和队列绑定
channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");
//6. 发布消息
String msg_a = "hello 路由队列~ my routingKey is ae.a.f...";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg_a.getBytes());
String msg_b = "hello 路由队列~ my routingKey is ef.a.b...";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes());
String msg_c = "hello 路由队列~ my routingKey is c.ef.d ...";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
消费者1: 只接收*.a.*
消费者2: 接收*.*.b c.#
6 RPC(RPC通信)
客户端:
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
//3. 发送请求
String msg = "hello rpc...";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(correlationID)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());
//4. 接收响应
//使用阻塞队列, 来存储响应信息
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String respMsg = new String(body);
System.out.println("接收到回调消息: "+ respMsg);
if (correlationID.equals(properties.getCorrelationId())){
//如果correlationID校验一致
response.offer(respMsg);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
String result = response.take();
System.out.println("[RPC Client 响应结果]:"+ result);
}
客户端:
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
//3. 发送请求
String msg = "hello rpc...";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(correlationID)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());
//4. 接收响应
//使用阻塞队列, 来存储响应信息
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String respMsg = new String(body);
System.out.println("接收到回调消息: "+ respMsg);
if (correlationID.equals(properties.getCorrelationId())){
//如果correlationID校验一致
response.offer(respMsg);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
String result = response.take();
System.out.println("[RPC Client 响应结果]:"+ result);
}
7 Publisher Confirms(发布确认)
消息丢失其中一种情况 ⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息
可以采⽤发布确认(Publisher Confirms)机制实现
发布确认有3种策略, 接下来我们来学习这三种策略
(1) Publishing Messages Individually(单独确认)
private static void publishingMessagesIndividually() throws Exception {
try(Connection connection = createConnection()) {
//1.开启信道
Channel channel = connection.createChannel();
//2. 设置信道为confirm模式
channel.confirmSelect();
//3.声明队列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);
//4.发送消息,并等待确认
Long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms"+i;
channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes());
//等待确认
channel.waitForConfirmsOrDie(5000);
}
Long end = System.currentTimeMillis();
System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
}
}
(2)Publishing Messages in Batches(批量确认)
private static void publishingMessagesInBatches() throws Exception {
try(Connection connection = createConnection()) {
//1.开启信道
Channel channel = connection.createChannel();
//2. 设置信道为confirm模式
channel.confirmSelect();
//3.声明队列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null);
//4. 发送消息, 并进行确认
long start = System.currentTimeMillis();
int batchSize = 100;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms"+i;
channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2,null,msg.getBytes());
outstandingMessageCount++;
if(outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
}
if(outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
}
}
(3)Handling Publisher Confirms Asynchronously(异步确认)
private static void handlingPublisherConfirmsAsynchronously() throws Exception {
try(Connection connection = createConnection()) {
//1.开启信道
Channel channel = connection.createChannel();
//2. 设置信道为confirm模式
channel.confirmSelect();
//3.声明队列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);
//4. 监听confirm
//集合中存储的是未确认的消息ID
long start = System.currentTimeMillis();
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if(multiple) {
confirmSeqNo.headSet(deliveryTag+1).clear();
} else {
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if(multiple) {
confirmSeqNo.headSet(deliveryTag+1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
//业务需要根据实际场景进行处理, 比如重发, 此处代码省略
}
});
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms"+i;
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes());
confirmSeqNo.add(seqNo);
}
while (!confirmSeqNo.isEmpty()) {
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
}
}
消息数越多, 异步确认的优势越明显
小结:1-5工作模式重点学习 6-7工作模式了解即可