0
点赞
收藏
分享

微信扫一扫

CMQ——多线程实现自动拉取消息


何为CMQ?

        腾讯云消息队列(Cloud Message Queue,CMQ)是一种分布式消息队列服务,它能够提供可靠的基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)之间的收发消息,存储在可靠有效的 CMQ 队列中,防止消息丢失。 CMQ 支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。——来源以及更多内容推荐看​​官方文档​​。

       之前公司内部使用rabbitMQ,但是运维调整部署全部迁移到腾讯云上,如果继续使用rabbitMQ,还需要运维自主去搭建环境,维护之类,而且经考察对rabbitMQ维护成本相比直接使用腾讯云的CQM高很多,所以最近技术部门对CMQ进行研究发现基本可以替代rabbitMQ,但是同时也发现一个比较严重的问题,使用cmq的mq功能,无法实现完全实现自动触发消息消费,因为cmq的消息监听基于长连接的,长时间没有消息推送会造成长连接断开,无法实现自动触发消息消费了。本文目的主要解决CQM自动触发消息消费。

利用spring中可以根据注解获取bean,调用对应通知方法,实现多线程自动拉取消息。

自定义注解Queue

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface IzkQueue {
String value() default "";

String queueName() default "";
}

消息处理器抽象统一接口

/**
* 消息处理器抽象统一接口
*/
public interface IBaseCmqHandler {

/**
* 处理从cmq中获取的消息
*
* @param queueName : 队列名
* @param message : 消息体
* @return
*/
boolean onMessage(String queueName, Message message);
}

CMQ消息监听类

@Slf4j
@Component
public class CmqListener implements ApplicationContextAware, ApplicationListener<ApplicationEvent> {

@Setter
private ApplicationContext applicationContext;
@Autowired
private TaskExecutor taskExecutor;

private boolean isStart = false;

/**
* 获取所有的需要监听mq的类,以及注册的mq
* @param applicationEvent
*/
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
HashMap<String, IBaseCmqHandler> map = new HashMap<>(16);
Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(IzkQueue.class);
beanMap.forEach((key, value) -> {
IzkQueue annotation = value.getClass().getAnnotation(IzkQueue.class);
String queue = annotation.queueName();
map.put(queue, (IBaseCmqHandler) value);
});
if (!isStart) {
isStart = true;
if (!CollectionUtils.isEmpty(map)) {
taskExecutor.execute(() -> executeQueueHandler(map));
}
}
}

private void executeQueueHandler(HashMap<String, IBaseCmqHandler> map) {
map.forEach((queueName, bean) -> {
taskExecutor.execute(() -> receiveCmqMessage(queueName, bean));
});

}

/**
* 功能描述 : 将队列与对应的消息处理器进行匹配,并进行消息消费
*
* @param queueName : queue name
* @param cmqHandler : 具体的消息处理器
* @return
* @created 2019-07-14 16:55
*/
private void receiveCmqMessage(String queueName, IBaseCmqHandler cmqHandler) {
try {
while (true) {
// 睡眠 释放cpu资源
Thread.sleep(10);
CmqQueue cmqQueue = applicationContext.getBean(queueName, CmqQueue.class);
Message message = cmqQueue.receiveMsg();

if (null != message) {
log.info("时间:{}, 队列:{}, 收到消息:{}", LocalDateTime.now(), queueName, message.msgBody);
if (!StringUtils.isEmpty(message.msgBody) && !StringUtils.isEmpty(message.receiptHandle)) {
taskExecutor.execute(() -> {
try {
// 处理消息
if (cmqHandler.onMessage(queueName, message)) {
// 消费成功 删除消息
cmqQueue.deleteMsg(message.receiptHandle);
} else {
taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));
}
} catch (Exception e) {
log.error("消息处理失败 --> 队列名:{}, 已进行自动补偿,Exception:", queueName, e);
taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));
}
});
}
}
}
} catch (Exception e) {
log.error("消息执行失败 --> 队列名:{}, 已进行自动补偿,Exception:", queueName, e);
taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));
}
}

}

关于上述涉及到类CmqQueue是公司内部封装类,将queue队列和cmq的账号绑定,只是大概展示一下,仅供参考。

账号信息类

@Data
public class MqAccount {
private String host;
private String port;
private String username;
private String password;
private String vhost;
private String secretId;
private String secretKey;
private String endpoint;
private String queueEndpoint;
}

CmqQueue的信息类

public class CmqQueue extends AbstractMq {
private static final Logger LOGGER = LoggerFactory.getLogger(CmqQueue.class);
private Account account;
private Queue queue;

public CmqQueue(MqAccount mqAccount, String queueName) {
mqAccount = (MqAccount)Preconditions.checkNotNull(mqAccount);
Preconditions.checkNotNull(queueName);
queueName = this.getNameWithSuffix(queueName);
this.init(mqAccount, queueName);
}

private void init(MqAccount mqAccount, String queueName) {
this.account = new Account(mqAccount.getQueueEndpoint(), mqAccount.getSecretId(), mqAccount.getSecretKey());
ArrayList list = Lists.newArrayList();

try {
this.account.listQueue(queueName, -1, -1, list);
long count = list.stream().filter((name) -> {
return queueName.equalsIgnoreCase(name);
}).count();
if (count == 0L) {
QueueMeta meta = new QueueMeta();
this.account.createQueue(queueName, meta);
} else {
LOGGER.warn("cmq queueName {} has exist", queueName);
}

this.queue = this.account.getQueue(queueName);
} catch (Exception var7) {
LOGGER.error("cmq createQueue error", var7);
throw new RuntimeException(var7);
}
}

public void setQueueAttr(QueueMeta meta) {
try {
this.queue.setQueueAttributes(meta);
} catch (Exception var3) {
LOGGER.error("cmq setQueueAttr error", var3);
}

}

public String sendMsg(String msg) {
try {
return this.queue.sendMessage(msg);
} catch (Exception var3) {
LOGGER.error("cmq queuename:{},sendMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
return null;
}
}

public List<String> batchSendMsg(List<String> msgs) {
try {
return this.queue.batchSendMessage(msgs);
} catch (Exception var3) {
LOGGER.error("cmq queuename:{},batchSendMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
return null;
}
}

public Message receiveMsg() {
Message message = null;

try {
message = this.queue.receiveMessage(10);
} catch (Exception var3) {
LOGGER.error("cmq queuename:{},receiveMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
}

return message;
}

public List<Message> batchReceiveMsg(int numOfMsg) {
try {
return this.queue.batchReceiveMessage(numOfMsg, 10);
} catch (Exception var3) {
LOGGER.error("cmq queuename:{},batchReceiveMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
return null;
}
}

public void deleteMsg(String receiHandle) {
try {
this.queue.deleteMessage(receiHandle);
} catch (Exception var3) {
LOGGER.error("cmq queuename:{},deleteMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
}

}

public void batchDeleteMsg(List<String> receiHandles) {
try {
this.queue.batchDeleteMessage(receiHandles);
} catch (Exception var3) {
LOGGER.error("cmq queuename:{},batchDeleteMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
}

}
}

public abstract class AbstractMq {
protected String exchangeName;
protected String exchangeType = "topic";

public AbstractMq() {
}

protected String getExchangeType() {
return this.exchangeType;
}

protected String getNameWithSuffix(String name) {
return !DeveloperUtil.isLocalDebug() ? name + "_" + Util.runEvn : name + "_local";
}
}

Demo案例

@IzkQueue(queueName = "queueDemo",value = "demo")
public class MessageDemo implements IBaseCmqHandler {
@Override
public boolean onMessage(String queueName, Message message) {
//todo
return false;
}
}

总结

       不将就是发现的原动力,多思考多动手。       

举报

相关推荐

0 条评论