RabbitMQ实现延时队列
前言
我们在设计大型的购物类的网站的时候,有这样的一个场景就是我们下订单后如果长时间不去支付的话需要超时将订单取消,并且存库要恢复,这就是我们经常说的订单过期会库存。
分布式秒杀系统的设计可参考我的博客:分布式秒杀系统的设计
SpringBoot项目中使用RabbitMQ可参考我的博客:SpringBoot项目中使用RabbitMQ
正文
延时队列
订单过期库存回库的场景:
- 淘宝七天自动确认收货:在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家。
- 12306 购票支付确认页面:我们在选好票再完成支付之前内部都是有倒计时的,如果 30 分钟内订单不确认支付的话将会自动取消订单。
解决订单过期库存回库的潜在方案:
- 使用后台线程不断扫描数据库,性能极低,弃用
- 将订单数据存入
Redis
中并设置失效时间,考虑到要保留订单信息,弃用 - 使用
delayedQueue
延时队列,设置到期时间,其中的对象只能在到期时才能从队列中取走,进行过期操作,但是不支持分布式,弃用 - 使用分布式定时任务去处理相关订单,但是时间间隔难以设置,并且数据量巨大,慎用
- 使用
RabbitMQ
延迟队列来实现,并使用定时任务处理可能的消息丢失导致的库存无法释放,采用
RabbitMQ延时队列
RabbitMQ延时队列的实现
- 在
RabbitMQ 3.6.x
之前我们一般采用死信队列+TTL过期时间
来实现延迟队列。 - 在
RabbitMQ 3.6.x
开始,RabbitMQ
官方提供了延迟队列的插件,可以下载放置到RabbitMQ
根目录下的plugins
下。
以RabbitMQ 3.6.6为例安装延时插件
第一步:下载RabbitMQ延时队列的插件
- 官方提供的插件下载地址
- 3.6.6版本的延时插件下载地址
第二步:将放置对应的plugins的目录下
cd /rabbitmq/lib/rabbitmq_server-3.6.6/plugins
第三步:启动插件
whereis rabbitmq
cd /usr/local/rabbitmq/bin
sh rabbitmq-plugins enable rabbitmq_delayed_message_exchange
第四步:重起RabbitMQ服务器使插件生效
service rabbitmq-server restart
或者
/usr/local/rabbitmq/bin/rabbitmqctl stop
/usr/local/rabbitmq/bin/rabbitmq-server -detached
第五步:查看插件是否正常安装
sh rabbitmq-plugins list
SpringBoot使用RabbitMQ实现延时队列
依赖
<!--rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties:springboot配置类
##配置rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#自定义参数:并发消费者的初始化值
spring.rabbitmq.listener.concurrency=10
#自定义参数:并发消费者的最大值
spring.rabbitmq.listener.max-concurrency=20
#自定义参数:每个消费者每次监听时可拉取处理的消息数量
spring.rabbitmq.listener.prefetch=5
#字符串型的消息
string.queue.name=string.queue.name2
string.exchange.name =string.exchange.name2
string.routing.key.name=string.routing.key.name2
RabbitmqConfig.class:rabbitmq配置类
-
directExchange.setDelayed(true)
:交换器开启支持延迟
/**
* rabbitmq配置类
*/
@Configuration
public class RabbitmqConfig {
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 单一消费者,即为队列消息
*
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
/**
* RabbitTemplate工具类
* RabbitTemplate中会使用通道
* 通道是多路复用的双向数据通道,可以减少TCP连接
*
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
/**生成rabbitmq组件
* 队列:用来存储消息的数据结构,位于硬盘或内存中。
* 交换机:接收发送到RabbitMQ中的消息并决定把他们投递到那个队列的组件;
* 绑定:一套规则,用于告诉交换器消息应该被存储到哪个队列。
*
* **/
/**
* 字符型消息
**/
//定义队列
@Bean(name = "stringQueue")
public Queue stringQueue() {
return new Queue(env.getProperty("string.queue.name"), true);
}
//定义交换机 DirectExchange为队列消息交换器
@Bean
public DirectExchange stringExchange() {
DirectExchange directExchange =new DirectExchange(env.getProperty("string.exchange.name"), true, false);
directExchange.setDelayed(true);
return directExchange;
}
//定义绑定
@Bean
public Binding stringBinging() {
return BindingBuilder.bind(stringQueue()).to(stringExchange()).with(env.getProperty("string.routing.key.name"));
}
}
CommonMqListener.class:RabbitMQ消费者
/**
* 这里是消息队列的消费者
*/
@Component
@Slf4j
public class CommonMqListener {
@Autowired
RedisTemplate redisTemplate;
/**
* 监听消费消息
*
* @param message
*/
@RabbitListener(queues = "${string.queue.name}", containerFactory = "singleListenerContainer")
public void consumeStringQueue(@Payload byte[] message) {
try {
log.info("监听消费,当前时间"+ DateTimeUtils.getSystemTime() +" 监听到消息: {} ", new String(message, "UTF-8"));
} catch (Exception e) {
log.error("监听消费订单消息消息异常{},{}",e.getStackTrace(), e);
}
}
}
AppForTest.class:测试类
-
message.getMessageProperties().setDelay(60000)
:设置1分钟的延迟,默认时间单位为毫秒
@RunWith(SpringJUnit4ClassRunner.class)
//启动Spring
@SpringBootTest(classes = App.class)
public class AppForTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
@Test
public void ceshi(){
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("string.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("string.routing.key.name"));
try {
String msg="发送信息:当前时间"+ DateTimeUtils.getSystemTime();
Message message = MessageBuilder.withBody(msg.getBytes("UTF-8")).build();
//设置请求编码格式
message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);
//设置60秒种的延时
message.getMessageProperties().setDelay(60000);
rabbitTemplate.convertAndSend(message);
} catch (Exception e) {
throw new BusinessException(CouponTypeEnum.OPERATE_ERROR,"字符串型消息生产者发送消息失败:" + e.getMessage());
}
}
}
最终演示结果:
- 生产者发送消息后,一分钟后被消费者接收并消费