0
点赞
收藏
分享

微信扫一扫

rabbitMQ初步使用

秀妮_5519 2022-01-31 阅读 63

rabbitMQ初步使用整合springboot

rabbitMQ简介

MQ简介

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。多用于分布式系统之间进行通信。

为什么要使用MQ

在没使用MQ之前可能需要对多个操作进行同步异步的操作,例如下图,在每项操作完成之后才能返回执行结果。

  • 这样会造成大量请求下的系统压力
  • 耗时较长,用户体验变差

没用MQ之前的业务系统操作
于是就有了解耦的方式
在这里插入图片描述

应用方式

  1. 任务异步处理
  1. 应用程序解耦合
  1. 削峰填谷

协议

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

AMQP
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

AMQP 与 JMS 区别

JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模式;而AMQP的消息模式更加丰富

rabbitMQ简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
在这里插入图片描述

核心概念

Message

Publisher

Exchange

Queue

Binding

Connection

Channel

Consumer

Virtual Host

Broker

rabbitMQ在liunx安装

这里我们使用docker 镜像进行安装

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p  25672:25672 -p  15671:15671 -p 15672:15672 rabbitmq:management

安装后可通过管理控制台打开
http://ip:15672

4369,25672(Erlang发现&集群端口)
5672,5671(AMQP端口)
15672 (web管理后台端口)
61613,61614(STOMP协议端口)
1883,8883(MQTT协议端口)

整合springboot初步测试

首先在 pom文件中添加依赖
pom.xml

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

启动类中开启rabbit
Start类

@EnableRabbit
@MapperScan("com.xfwang.gulimall.order.dao")
@SpringBootApplication
public class GulimallOrderApplication {
...
}

测试中通过
AmqpAdminrabbitTemplate
去创建rabbitMQ的组件的发送消息

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {

	@Autowired
	private AmqpAdmin amqpAdmin;

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Test
	public void sendMessageTest() {
		OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
		reasonEntity.setId(1L);
		reasonEntity.setCreateTime(new Date());
		reasonEntity.setName("reason");
		reasonEntity.setStatus(1);
		reasonEntity.setSort(2);
		String msg = "Hello World";
		//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口

		//2、发送的对象类型的消息,可以是一个json
		rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
				reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
		log.info("消息发送完成:{}",reasonEntity);
	}

	/**
	 * 1、如何创建Exchange、Queue、Binding
	 *      1)、使用AmqpAdmin进行创建
	 * 2、如何收发消息
	 */
	@Test
	public void createExchange() {

		Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
		amqpAdmin.declareExchange(directExchange);
		log.info("Exchange[{}]创建成功:","hello-java-exchange");
	}



	@Test
	public void testCreateQueue() {
		Queue queue = new Queue("hello-java-queue",true,false,false);
		amqpAdmin.declareQueue(queue);
		log.info("Queue[{}]创建成功:","hello-java-queue");
	}


	@Test
	public void createBinding() {

		Binding binding = new Binding("hello-java-queue",
				Binding.DestinationType.QUEUE,
				"hello-java-exchange",
				"hello.java",
				null);
		amqpAdmin.declareBinding(binding);
		log.info("Binding[{}]创建成功:","hello-java-binding");

	}

	@Test
	public void create() {
		HashMap<String, Object> arguments = new HashMap<>();
		arguments.put("x-dead-letter-exchange", "order-event-exchange");
		arguments.put("x-dead-letter-routing-key", "order.release.order");
		arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
		Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
		amqpAdmin.declareQueue(queue);
		log.info("Queue[{}]创建成功:","order.delay.queue");
	}


}
举报

相关推荐

0 条评论