yml配置
spring:
rabbitmq:
host: offline-tech.com
port: 30061
username: admin
password: admin
connection-timeout: 5000ms
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true
publisher-confirm-type: correlated
消息生产者
rabitmq 配置文件,初始化队列,exchange,死信队列等
package com.kunchi.auditcenter.facade.mq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//配置队列信息
@Configuration
public class RabbitQueueConfig {
//队列过期时间:4小时
private int orderQueueTTL = 4 * 3600 * 1000;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
// 配置普通队列
@Bean
public Queue approvalCodeQueue() {
return QueueBuilder.durable(RabbitConstants.APPROVAL_CODE_QUEUE)
.ttl(orderQueueTTL)
.deadLetterRoutingKey(RabbitConstants.DEAD_ROUTE_KEY)//设置死信队列的RouteKey
.deadLetterExchange(RabbitConstants.DEAD_EXCHANGE)//设置死信队列的Exchange
.build();
}
@Bean
public TopicExchange fanweiTopicExchange() {
return new TopicExchange(RabbitConstants.FANWEI_EXCHANGE);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(approvalCodeQueue())
.to(fanweiTopicExchange())
.with(RabbitConstants.APPROVAL_CODE_ROUTE_KEY);
}
//配置死信队列
@Bean
public Queue deadQueue() {
return new Queue(RabbitConstants.DEAD_QUEUE, true);
}
@Bean
public TopicExchange deadExchange() {
return new TopicExchange(RabbitConstants.DEAD_EXCHANGE);
}
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue())
.to(deadExchange())
.with(RabbitConstants.DEAD_ROUTE_KEY);
}
}
消息生产者
//定义一个类,发送消息,接收消息,并进行消息确认回调
@Slf4j
@Service
public class MqProduceServiceImpl implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback, FanweiMqProduceService {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void setCallback() {
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void sendWorkflowRequest(WorkflowRequestMessage message) {
CorrelationData correlationData = new CorrelationData(String.valueOf ( message.getId () ));
rabbitTemplate.convertAndSend( RabbitConstants.FANWEI_EXCHANGE, RabbitConstants.APPROVAL_CODE_ROUTE_KEY, message, correlationData);
}
//Exchange 到Queue投递成功,不回调ReturnCallback
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息丢失, 没有投递成功:"+message.getBody());
String msg = new String(message.getBody());
WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class );
LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_SEND ).exchange ( exchange)
.queue (routingKey).message ( JSON.toJSONString ( requestMessage ))
.error ( replyText).requstId (requestMessage.getRequestId () ).build();
logMqRequestResumeService.save ( log );
}
// confirm确认消息投递成功了
//如果没有 Exchange ack=false,
// * 如果有 Exchange ack=tru
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功,发送ack确认,id="+correlationData.getId());
if (ack){
log.info("发送成功");
}else {
String id = correlationData.getId ();
ApprovalDataRecordOaPO record = approvalDataRecordOaRepositoryService.getById ( Long.valueOf ( id ) );
WorkflowRequestMessage requestMessage = new WorkflowRequestMessage ( record.getApprovalOaId (), record.getCreateBy (), record.getId () );
LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_SEND ).exchange ( RabbitConstants.FANWEI_EXCHANGE )
.queue (RabbitConstants.APPROVAL_CODE_QUEUE).message ( JSON.toJSONString ( requestMessage ))
.error ( cause).requstId (requestMessage.getRequestId () ).build();
logMqRequestResumeService.save ( log );
}
}
}
消息消费者
@Slf4j
@Service
public class FanweiConsumer {
@Autowired
private FanweiOaService fanweiOaService;
@Autowired
private LogMqRequestResumeService logMqRequestResumeService;
@RabbitListener (queues = RabbitConstants.APPROVAL_CODE_QUEUE )
@RabbitHandler
public void approvalCode(Message message, Channel channel) throws IOException {
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
String msg = new String(message.getBody());
WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class );
log.info ("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
try {
log.info("收到消息:{}", msg);
//TODO 具体业务
fanweiOaService.getApprovalCodeAndUpdate ( requestMessage );
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.info ( e.getMessage (),e );
requestMessage.setError ( e.getMessage () );
dealFailureMessage ( message, channel, requestMessage);
}
}
/**
* 处理失败消息, 达到最大重试次数后,错误信息存入数据表
* @param message
* @param channel
* @param requestMessage
* @throws IOException
*/
private void dealFailureMessage(Message message, Channel channel, WorkflowRequestMessage requestMessage) throws IOException {
log.info("消息即将再次返回队列处理..."+requestMessage.getRetryCount ());
if (requestMessage.getRetryCount ()>=FanweiOaConst.MAX_RETRY){
channel.basicReject (message.getMessageProperties().getDeliveryTag(), false);
} else {
requestMessage.setRetryCount ( requestMessage.getRetryCount ()+1 );
log.info("消息重新发送..."+requestMessage.getRetryCount ());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(requestMessage));
}
}
// 监听死信队列
@RabbitListener(queues = RabbitConstants.DEAD_QUEUE)
@RabbitHandler
public void deadQueueListener(Message message, Channel channel) throws IOException {
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
String msg = new String(message.getBody());
System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class );
LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_RECEIVE ).exchange ( RabbitConstants.FANWEI_EXCHANGE )
.queue (RabbitConstants.APPROVAL_CODE_QUEUE).message (JSON.toJSONString ( requestMessage ))
.error ( requestMessage.getError() ).requstId (requestMessage.getRequestId () ).build();
logMqRequestResumeService.save ( log );
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}