0
点赞
收藏
分享

微信扫一扫

RocketMQ 与 Spring Boot整合(五、广播消费)

朱小落 2021-10-09 阅读 74
RocketMQMQ

在上述的示例中,我们看到的都是使用集群消费。而在一些场景下,我们需要使用广播消费

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

下面,我们开始本小节的示例。

5.1 Demo05Message

package com.ebadagang.springboot.rocketmq.message;

/**
* 示例 05 的 Message 消息
*/

public class Demo05Message {

public static final String TOPIC = "DEMO_05";

/**
* 编号
*/

private Integer id;

public Demo05Message setId(Integer id) {
this.id = id;
return this;
}

public Integer getId() {
return id;
}

@Override
public String toString() {
return "Demo05Message{" +
"id=" + id +
'}';
}

}

5.2 Demo05Producer

创建 [Demo04Producer]类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现同步发送消息。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.message.Demo05Message;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Demo05Producer {

@Autowired
private RocketMQTemplate rocketMQTemplate;

public SendResult syncSend(Integer id) {
// 创建 Demo05Message 消息
Demo05Message message = new Demo05Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSend(Demo05Message.TOPIC, message);
}

}

5.3 Demo05Consumer

创建 [Demo05Consumer]类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:

package com.ebadagang.springboot.rocketmq.consumer;

import com.ebadagang.springboot.rocketmq.message.Demo05Message;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
topic = Demo05Message.TOPIC,
consumerGroup = "demo05-consumer-group-" + Demo05Message.TOPIC,
messageModel = MessageModel.BROADCASTING // 设置为广播消费
)

public class Demo05Consumer implements RocketMQListener<Demo05Message> {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void onMessage(Demo05Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}
  • 主要是 @RocketMQMessageListener 注解,通过设置了 messageModel = MessageModel.BROADCASTING ,表示使用广播消费

5.4 简单测试

创建 [Demo05ProducerTest]测试类,用于测试广播消费。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo05ProducerTest {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private Demo05Producer producer;

@Test
public void test() throws InterruptedException {
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}

@Test
public void testSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);

// 阻塞等待,保证消费
new CountDownLatch(1).await();
}

}

5.4.1 首先

执行#test()测试方法,先启动一个消费者分组 "demo05-consumer-group-DEMO_05" 的 Consumer 节点。

5.4.2 然后

执行#testSyncSend()测试方法,先启动一个消费者分组 "demo05-consumer-group-DEMO_05"Consumer节点。同时,该测试方法,调用 Demo05ProducerTest#syncSend(id)方法,同步发送了一条消息。控制台输出如下:

5.4.3 #testSyncSend() 方法对应的控制台

# Producer 同步发送消息成功
2020-08-04 21:56:34.739 INFO 10824 --- [ main] c.e.s.r.producer.Demo05ProducerTest : [testSyncSend][发送编号:[1596549394] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A2A4818B4AAC2142870A50000, offsetMsgId=6585E30D00002A9F0000000000039CC7, messageQueue=MessageQueue [topic=DEMO_05, brokerName=broker-a, queueId=0], queueOffset=0]]]
# Demo05Consumer 消费了该消息
2020-08-04 21:56:34.771 INFO 10824 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo05Consumer : [onMessage][线程编号:174 消息内容:Demo05Message{id=1596549394}]

5.4.4 #test() 方法对应的控制台

# 另外一个 Demo05Consumer 也消费了该消息
2020-08-04 21:56:34.755 INFO 15504 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo05Consumer : [onMessage][线程编号:184 消息内容:Demo05Message{id=1596549394}]

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

举报

相关推荐

0 条评论