0
点赞
收藏
分享

微信扫一扫

SpringBoot2.0整合ActiveMQ

【1】pom文件

添加ActiveMQ依赖:

<!--整合ActiveMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>

【2】yml配置

yml配置文件如下:

spring:
activemq:
user: root
password: 123456
broker-url: tcp://127.0.0.1:61616
pool:
enabled: true
max-connections: 50
packages:
trust-all: true

【3】生产者消费者队列

主程序类配置如下:

@SpringBootApplication
@EnableJms //ActiveMQ
public class HhProvinceApplication {

public static void main(String[] args) {
SpringApplication.run(HhProvinceApplication.class, args);
}
}

MyActiveMQConfig如下:

@Configuration
public class MyActiveMQConfig {

@Bean
public Queue logQueue() {
return new ActiveMQQueue("app.log");
}
}

生产者示例:

@Service
public class SysVisitLogServiceImpl implements ISysVisitLogService {

private static final Logger log = LoggerFactory.getLogger(SysVisitLogServiceImpl.class);

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired
private Queue logQueue;


/* 日志插入 */
public void insertVisitLog(SysVisitLog sysVisitLog) {
log.debug("insertVisitLog :收到请求,开始调用队列插入访问日志--"+sysVisitLog);
jmsMessagingTemplate.convertAndSend(logQueue, sysVisitLog);
}
}

消费者示例:

@Service
public class ConsumerListener {

private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

@Autowired
SysVisitLogMapper visitLogMapper;

@JmsListener(destination="app.log")
@Transactional(rollbackFor={Exception.class})
public void insertVisitLog(SysVisitLog sysVisitLog){
int i = visitLogMapper.insertSelective(sysVisitLog);
log.info("消费者插入日志成功 i:"+i+"--sysVisitLog : "+sysVisitLog);
}
}

【4】以前SSM下使用ActiveMQ

以前在SSM(SpringMVC Spring MyBatis)下主要使用xml对ActiveMQ进行配置,代码中生产者和消费者同样使用注解。

ActiveMQ xml配置如下:

<!--这个是队列目的地,点对点的-->
<bean id="InsertVisitLogQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>InsertVisitLogQueue</value>
</constructor-arg>
</bean>

<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
<property name="userName" value="root" />
<property name="password" value="123456" />
<property name="useAsyncSend" value="true" />
<property name="trustAllPackages" value="true"/>
</bean>
</property>
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="receiveTimeout" value="2000" />
</bean>

<!-- 支持@JmsListener自动启动监听器 -->
<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>

对比SpringBoot2.0,可以发现简化了很多配置。

生产者示例:

@Service
public class SysVisitLogServiceImpl implements ISysVisitLogService {

private static final Logger log = LoggerFactory.getLogger(SysVisitLogServiceImpl.class);

@Autowired
private JmsTemplate jmsTemplate;

@Qualifier("InsertVisitLogQueue")
@Autowired
private Destination destinationInsertVisitLogQueue;

/* 插入tb_sys_visit_log */
@Override
public void insertVisitLog(SysVisitModel sysVisitModel) {
log.debug("insertVisitLog :收到请求,开始调用队列插入访问日志--"+sysVisitModel);
new Thread(new Runnable(){
@Override
public void run() {
jmsTemplate.send(destinationInsertVisitLogQueue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage();
msg.setObject(sysVisitModel);
/* 一分钟后插入访问日志 */
long delay = 60 * 1000;
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
return msg;
}
});

}}).start();
}

}

消费者示例:

@Service
public class ConsumerListener {

@JmsListener(destination="InsertVisitLogQueue",concurrency="10-20")
@Transactional(rollbackFor={Exception.class})
public void insertVisitLog(SysVisitModel sysVisitModel){
log.info("消费者获取到的sysVisitModel : "+sysVisitModel+Thread.currentThread().getName());
//...
}
}

pom依赖:

<!-- activeMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>

ActiveMQ安装:​​​​

MQ对比参考博客:各类MQ比较分析


举报

相关推荐

0 条评论