延迟队列可以通过设置生产者发送设置有ttl时间的信息给队列来实现,但这种方式中的消息可能不会按时成为死信,而且rabbitmq每次只能检测第一个消息是否过期,如果过期就丢入死信队列,第一个消息延迟时间很长,第二个消息延迟时间很短,第二个消息并不会先执行。
如:若先发一个ttl为40s的消息,然后发一个ttl为10s的消息,ttl为10s的消息并不会先被执行,而是要等ttl为40s的消息消费完后,再消费ttl为10s的消息。
延迟交换机:类型为:x-delayed-message,消息发送给延迟交换机,交换机不会立马发送给队列,而是存储在一个分布式数据系统表中,等到达到投递时间,便会发送给队列。
案例:生产者先发送ttl为40s的消息,然后发送ttl为10s的消息给延迟交换机,如下图:

1、安装延迟队列插件,找到对应rabbitmq安装目录plugins下,执行如下命令,并重启rabbitmq服务。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange出现x-delayed-message即安装成功

2、创建获取rabbitmq服务连接工具类,启动服务器,关闭linux上防火墙
//连接工厂,创建信道工具类
public class RabbitUtils {
    // 得到一个连接的 channel
    public static Channel getChannel() throws Exception {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.23.129");
        factory.setUsername("user");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
3、生产者发送延迟消息
(1)调用RabbitUtils工具类获取信道;
(2)调用exchangeDeclare(交换机名,类型(此处为x-delayed-message),是否持久化,是否自动删除,其他参数)声明延迟交换机,其他参数要设置x-delayed-type属性,设置改自定义交换机支持direct型;
(3)调用basicPublish(交换机名,路由,其他参数,消息体)方法发送消息,通过设置其他参数中的headers中x-delay属性,来设置延迟消息;
(4)获取发送消息后本地时间。
public class Producer {
    //延迟交换机
    private final static String DELAY_EXCHANGE="delay_exchange";
    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitUtils.getChannel();
        //声明延迟交换机
        Map<String,Object> argument=new HashMap<>();
        //设置改交换机为direct
        argument.put("x-delayed-type","direct");
        channel.exchangeDeclare(DELAY_EXCHANGE,"x-delayed-message",true,false,argument);
        //发送ttl为40秒的消息
        Map<String,Object> map=new HashMap<>();
        map.put("x-delay","40000");
        AMQP.BasicProperties properties=
                new AMQP.BasicProperties().
                        builder().headers(map).build();
        String message="该消息的ttl时间为40秒";
        channel.basicPublish(DELAY_EXCHANGE,"delay",properties,message.getBytes("UTF-8"));
        System.out.println("第一条消息:"+message+",发送成功,当前时间为:"
                + new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
        //发送ttl为10秒的消息
        map.put("x-delay","10000");
        AMQP.BasicProperties properties1=
                new AMQP.BasicProperties().
                        builder().headers(map).build();
        String message1="该消息的ttl时间为10秒";
        channel.basicPublish(DELAY_EXCHANGE,"delay",properties1,message1.getBytes("UTF-8"));
        System.out.println("第二条消息:"+message1+",发送成功,当前时间为:"
                + new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
    }
}4、消费者接收消息
(1)调用工具类获取信道,声明延迟队列,该队列不设置ttl时间;
(2)绑定延迟队列与延迟交换机;
(3)调用basicConsumer(队列名,是否自动应答,成功回调函数,失败回调函数)方法接收延迟消息,并获取本地时间。
public class Consumer {
    //延迟交换机
    private final static String DELAY_EXCHANGE="delay_exchange";
    //延迟队列
    private final static String DELAY_QUEUE="delay_queue";
    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitUtils.getChannel();
        //声明延迟队列
        channel.queueDeclare(DELAY_QUEUE,false,false,false,null);
        //绑定延迟队列与延迟交换机
        channel.queueBind(DELAY_QUEUE,DELAY_EXCHANGE,"delay");
        //接受消息成功回调函数
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8")
                    +",当前时间为:"+new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
        };
        //接收消息
        channel.basicConsume(DELAY_QUEUE,true,deliverCallback,consumerTag->{});
    }
}5、测试,生产者先发送ttl为40s的下洗,然后发送ttl为10s的消息,结果如下:
 
 
 
 
ttl为10s的先被消费者消费,测试成功。










