0
点赞
收藏
分享

微信扫一扫

Redis实现延迟消息队列

天涯学馆 2021-12-29 阅读 65

消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ。

  • 如果只需要实现简单的消息队列,那么借助Redis即可。
  • 如果对消息有着严格的可靠性等要求,那么建议使用专业的MQ.(RocketMQ,Kafka,RabbitMQ)‘

Redis实现延迟消息队列的思想

可以借助zset有序集合来实现延迟消息队列。因为zset有一个score,它是可以按这个score来进行排序的,我们可以把时间戳作为zset的score,让它按时间去排序,然后在Java程序中使用轮询或者定时任务来消费里面的消息。这里使用的是点对点的消费模式。
以下是代码的展示.
第一步,先创建一个发送消息的对象

package com.xjm.redis;

public class RedisMessage {
    private String id;
    private Object message;

    @Override
    public String toString() {
        return "RedisMessage{" +
                "id='" + id + '\'' +
                ", message=" + message +
                '}';
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Object getMessage() {
        return message;
    }

    public void setMessage(Object message) {
        this.message = message;
    }
}

第二步,引入序列化工具jackson

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.3</version>
</dependency>

第三步,编写一个消息队列的工具类,主要包含消息入队和消费功能

package com.xjm.redis;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.Set;
import java.util.UUID;

/**
 * 延迟消息队列
 */
public class DelayMsgQueue {
    private Jedis jedis;
    private String queue;

    public DelayMsgQueue(Jedis jedis, String queue) {
        this.jedis = jedis;
        this.queue = queue;
    }

    /**
     * 消息入队,要发送的消息
     * @param message
     */
    public void queue(Object message){
         RedisMessage redisMessage = new RedisMessage();
         redisMessage.setId(UUID.randomUUID().toString());
         redisMessage.setMessage(message);
        try {
            //序列化
            String s = new ObjectMapper().writeValueAsString(redisMessage);
            System.out.println("Redis发送消息:"+new Date());
            //消息发送,score延迟5秒
            jedis.zadd(queue,System.currentTimeMillis()+5000,s);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 消息出队(消息消费)
     *
     */
    public void loop(){
        /**
         * 轮询,线程被中断时停止
         */
        while (!Thread.interrupted()){
            //读取score时间在0到当前时间戳的之间的消息,一次一条
            Set<String> message = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
            if (message.isEmpty()){
                try {
                    //如果消息为空,则线程休眠一段时间
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    break;
                }
                continue;
            }
            //如果读取到了消息,则直接加载
            String next = message.iterator().next();
            if(jedis.zrem(queue,next)>0){
                //抢到了,接下来处理业务
                try {
                    RedisMessage redisMessage = new ObjectMapper().readValue(next, RedisMessage.class);
                    System.out.println("抢到了!"+new Date());
                    System.out.println(redisMessage.toString());
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

第四步,测试

package com.xjm.redis;


public class DelayMsgTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.exeute(jedis -> {
            //构造一个消息队列
            DelayMsgQueue delayMsgQueue = new DelayMsgQueue(jedis, "jaymin-delay-queue");
            Runnable producer = new Runnable() {
                @Override
                public void run() {
                    for (int i=0;i<5;i++){
                        delayMsgQueue.queue("Java>>>>>"+i);
                    }
                }
            };
            Thread producerThread = new Thread(producer);
            Runnable customer = new Runnable() {
                @Override
                public void run() {
                    delayMsgQueue.loop();
                }
            };
            Thread customerThread = new Thread(customer);
            producerThread.start();
            customerThread.start();
            try {
                //休息7秒后,停止程序
                Thread.sleep(7000);
                customerThread.interrupt();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });

    }
}

测试结果如下:


在此鸣谢@江南一点雨,感谢他的redis视频解析.

举报

相关推荐

0 条评论