0
点赞
收藏
分享

微信扫一扫

springboot rocketmq 延时消息、延迟消息


rocketmq也有延迟消息,经典的应用场景:订单30分钟未支付,则取消的场景

springboot rocketmq 延时消息、延迟消息_spring boot


其他博客提到从rocketmq5.0开始,支持自定义延迟时间,4.x只支持预定义延迟时间,安装rocketmq可参考RocketMq简介及安装、docker安装rocketmq、安装rocketmq可视化管理端

引入rocketmq5.x对应的客户端

引入支持rocketmq5.x的rocketmq-spring

<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>2.2.3</version>
 </dependency>

application.yml配置

rocketmq:
  name-server: ip:9876

生产者

延迟方法有2种,

  • 延迟x时间执行,
  • syncSendDelayTimeMills ,延迟时间单位是毫秒、
  • syncSendDelayTimeSeconds,延迟时间单位是秒
  • 指定未来的某个时间点执行
  • syncSendDeliverTimeMills,某个时间点的时间戳

在实践中,我选择了延迟x时间执行,因为在测试时发现,如果应用宿主机的时间与rocketmq宿主机的时间不同步时会出现问题,比如应用宿主机时间比rocketmq宿主机时间晚了2分钟,发送了个未来2分钟执行的消息,因为宿主机时间差原因,可能会刚发送就会立刻执行。

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
 @Autowired
 private RocketMQTemplate rocketMQTemplate;
 pubic void sendDelayMsg(){
 	//延迟时间,单位毫秒
     Integer delay = 5000;
     String msgBody= JSONObject.toJSONString(someObject);
	 rocketMQTemplate.syncSendDelayTimeMills(
	              "myTopic:myTag",
	                msgBody,
	                delay);
}

消费者

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
@RocketMQMessageListener(topic = "myTopic",
        selectorExpression = "myTag",
        consumerGroup = "myConsumerTag")
public class MyMsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String msgBody) {
        log.info("收到的消息是:{}", msgBody);
        final SomeClass someObj = JSONObject.parseObject(msgBody, SomeClass.class);
       
    }

}


举报

相关推荐

0 条评论