rabbitMQ初步使用整合springboot
rabbitMQ简介
MQ简介
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。多用于分布式系统之间进行通信。
为什么要使用MQ
在没使用MQ之前可能需要对多个操作进行同步异步的操作,例如下图,在每项操作完成之后才能返回执行结果。
- 这样会造成大量请求下的系统压力
- 耗时较长,用户体验变差
于是就有了解耦的方式
应用方式
- 任务异步处理
- 应用程序解耦合
- 削峰填谷
协议
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
AMQP
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP 与 JMS 区别
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模式;而AMQP的消息模式更加丰富
rabbitMQ简介
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
核心概念
Message
Publisher
Exchange
Queue
Binding
Connection
Channel
Consumer
Virtual Host
Broker
rabbitMQ在liunx安装
这里我们使用docker 镜像进行安装
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
安装后可通过管理控制台打开
http://ip:15672
4369,25672(Erlang发现&集群端口)
5672,5671(AMQP端口)
15672 (web管理后台端口)
61613,61614(STOMP协议端口)
1883,8883(MQTT协议端口)
整合springboot初步测试
首先在 pom文件中添加依赖
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
启动类中开启rabbit
Start类
@EnableRabbit
@MapperScan("com.xfwang.gulimall.order.dao")
@SpringBootApplication
public class GulimallOrderApplication {
...
}
测试中通过
AmqpAdmin和rabbitTemplate
去创建rabbitMQ的组件的发送消息
@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
String msg = "Hello World";
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
log.info("消息发送完成:{}",reasonEntity);
}
/**
* 1、如何创建Exchange、Queue、Binding
* 1)、使用AmqpAdmin进行创建
* 2、如何收发消息
*/
@Test
public void createExchange() {
Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:","hello-java-exchange");
}
@Test
public void testCreateQueue() {
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:","hello-java-queue");
}
@Test
public void createBinding() {
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功:","hello-java-binding");
}
@Test
public void create() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:","order.delay.queue");
}
}