0
点赞
收藏
分享

微信扫一扫

RabbitMQ:工作队列模式

1.基本介绍

在这里插入图片描述

在这里插入图片描述

2.轮询发送消息

2.1抽取工具类

public class ConnectUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置连接参数
        //服务器IP地址
        factory.setHost("192.168.88.133");
        //连接端口
        factory.setPort(5672);
        //设置连接的虚拟机名称
        factory.setVirtualHost("/myhost");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");

        //2.创建Connection对象
        Connection connection = factory.newConnection();
        return connection;
    }

    /**
     * 创建信道对象
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        return channel;

    }

}

2.2 生产者

public class Producer {
    static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //发送消息
            for (int i = 1; i <= 10; i++) {
                String msg="hello rabbitmq!"+i;
                /**
                 * 参数1:交换机名称,不填写交换机名称的话则使用默认的交换机
                 * 参数2:队列名称(路由key)
                 * 参数3:其他参数
                 * 参数4:消息内容
                 */
                channel.basicPublish("", QUEUE_NAME, null,msg.getBytes() );
            }
            System.out.println("消息已经发送完毕");


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.3消费者

消费者1

public class Consumer1 {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) {

        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));

                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, true, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

消费者2

public class Consumer2 {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) {

        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));

                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, true, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

2.4测试

为了方便测试,我们需要先启动消费者,然后再启动生产者,不然生产者发送的消息会瞬间被某个消费者消费完
在这里插入图片描述
在这里插入图片描述

3.消息应答

3.1消息应答基本介绍

👀我们知道消费者完成一个任务是需要一定的时间的,如果消费者在处理一个长任务的时候,当它只处理一部分但是此时消费者却挂掉了,可能会出现下面的情况:
👀如果说RabbitMQ向消费者传递一条消息以后,不管消费者有没有处理完或者有没有接收到,就马上把消息标记为删除,那么,如果这个时候消费者挂掉了,就会导致丢失当前正在处理的消息,以及后续发送给消费者的消息,因为消费者不能接收到。
👀为了保证消息在发送过程中不会丢失,RabbitMQ引入了消息应答机制,消息应答就是消费者在接收到消息并且处理该消息以后,告诉RabbitMQ它已经处理了,RabbitMQ就可以把这个消息从消息 队列中删除了。

3.2消息自动应答

  • 消息发送后就马上认为已经传递成功了,这种模式需要在高吞吐量和数据传输安全性方面做权衡。因为如果使用这种模式,如果消息在被接收到之前,消费者那么出现连接或者信道关闭,那么消息就会丢失;不过,对于这种模式来说,消费者那里可以传递过载的消息,没有对传递的消息数量进行限制,这样就可能使得消费者这边因为接收了太多还来不及处理的消息,导致消息积压,最后使得内存耗尽,导致这些消费者线程被操作系统杀死,所以这种模式仅仅适用消费者可以高效并且以某种苏联能够处理这些消息的情况下使用。
  • 信息过载:是指社会信息超过了个人或系统所能接受、处理或有效利用的范围,并导致故障的状况。

3.3消息手动应答

  • 消费者从队列中消费消息,默认采用的是自动应答,自动应答可能导致消息没有完全消费而导致消息失效问题,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。而且,使用手动应答可以批量应答减少网络拥堵,下面三个方法可以用于手动应答消息:
  • Channel。basicAck():用于肯定确认,RabbitMQ已经知道消息被消费并且成功处理消息,可以把消息丢弃。
  • Channle.basicNack():用于否定确认
  • Channel.basicReject():用于否定确认,不处理该消息直接拒绝,然后把消息丢弃

3.4批量确认(Multiple)

3.5消息自动重新入队

如果一个消费者死了(它的通道被关闭,连接被关闭,或者TCP连接丢失)而没有发送一个ack,RabbitMQ就会明白一条消息没有被完全处理,并会重新排队。如果同时有其他消费者在线,它将迅速将其重新交付给另一个消费者。通过这种方式,您可以确保即使消费者偶尔死亡,也不会丢失任何消息。
image.png

3.6消息手动应答代码

生产者

public class Producer2 {
    static final String QUEUE_NAME="ack_work_queue";
    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //发送消息
            for (int i = 1; i <= 10; i++) {
                String msg="你好,小兔子!"+i;
                /**
                 * 参数1:交换机名称,不填写交换机名称的话则使用默认的交换机
                 * 参数2:队列名称(路由key)
                 * 参数3:其他参数
                 * 参数4:消息内容
                 */
                channel.basicPublish("", QUEUE_NAME, null,msg.getBytes() );
            }
            System.out.println("消息已经发送完毕");


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者1

public class Consumer3 {
    static final String QUEUE_NAME = "ack_work_queue";

    public static void main(String[] args) {

        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("消费者1-消费消息的时间比较短。");
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   //睡眠一秒
                    SleepUtil.sleep(1);
                    System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));
                    //手动确认
                    //每条消息都有对应的id,表明是第几条消息,false表示不批量
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, false, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

消费者2

public class Consumer4 {
    static final String QUEUE_NAME = "ack_work_queue";

    public static void main(String[] args) {

        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("消费者2-消费消息的时间比较长。");
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   //睡眠一秒
                    SleepUtil.sleep(50);
                    System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));
                    //手动确认
                    //每条消息都有对应的id,表明是第几条消息,false表示不批量
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, false, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

3.7消息手动应答效果

第一次测试,两个消费者都睡眠1秒
image.png
image.png
image.png
第二次测试,让消费者2睡眠30秒,然后观察两个消费者的消费情况,
image.png
image.png
image.png
接着把消费者2停掉,再次观察消费者1控制台打印的消息,发现队列中没有被消费的消息重新进入到队列中,并且被消费者1进行消费
image.png

4.消息的持久化

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍将丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列在 RabbitMQ 节点重新启动后仍能存活下来。为此,我们需要将其声明为_持久:_
如果我们之前创建的队列是非持久化的,如果RabbitMQ重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把durable参数设置为持久化;

4.1队列持久化

// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

image.png

4.2消息持久化

我们需要将消息标记为持久性 - 通过将消息属性(实现基本属性)设置为PERSISTENT_TEXT_PLAIN的值。

//交换机名称,队列名称,消息持久化,消息
channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

4.3公平调度

image.png

int prefetchCount = 1;
channel.basicQos(prefetchCount);
举报

相关推荐

0 条评论