目录
消息队列介绍
RabbitMQ安装
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
1、 授权命令:rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read}
-p vhostpath :用于指定一个资源的命名空间,例如 –p / 表示根路径命名空间
user:用于指定要为哪个用户授权填写用户名
conf:一个正则表达式match哪些配置资源能够被该用户配置。
write:一个正则表达式match哪些配置资源能够被该用户读。
read:一个正则表达式match哪些配置资源能够被该用户访问。
例如:
rabbitmqctl set_permissions -p / root '.*' '.*' '.*'
用于设置root用户拥有对所有资源的 读写配置权限
2、查看用户权限 rabbitmqctl list_permissions [vhostpath]
例如
查看根径经下的所有用户权限
rabbitmqctl list_permissions
查看指定命名空间下的所有用户权限
rabbitmqctl list_permissions /abc
3、查看指定用户下的权限rabbitmqctl list_user_permissions {username}
例如
查看root用户下的权限
rabbitmqctl list_user_permissions root
4、清除用户权限rabbitmqctl clear_permissions {username}
例如:
清除root用户的权限
rabbitmqctl clear_permissions root
vhost是RabbitMQ中的一个命名空间,可以限制消息的存放位置利用这个命名空间可以进行权限的控制有点类似Windows中的文件夹一样,在不同的文件夹中存放不同的文件。
1、添加vhost: rabbitmqctl add vhost {name}
例如
rabbitmqctl add vhost test
2、删除vhost:rabbitmqctl delete vhost {name}
例如
rabbitmqctl delete vhost test
RabbitMQ消息发送和接收
1、Message
消息,消息是不具体的,它由消息头和消息体组成。消息体是不透明的,而消息头则
由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息
的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
4、Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换
器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的
路由表。
5、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的
终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接
到这个队列将其取走。
6、Connection
网络连接,比如一个TCP连接。
7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的
TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、
订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建
立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP
连接。
8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
9、Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的
身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的
RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP
概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
10、Broker
表示消息队列服务器实体。
消息测试
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.0</version>
</dependency>
public class App {
public static void main( String[] args ) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myQueue",true,false,false,null);
String message = "RabbitMQ Message Test";
channel.basicPublish("","myQueue",null,message.getBytes("utf-8"));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

public class App {
public static void main( String[] args ) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.basicConsume("myQueue",true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"utf-8");
System.out.println("消息为"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
交换器测试
direct交换机
public class App {
public static void main( String[] args ) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myQueue",true,false,false,null);
channel.exchangeDeclare("myExchange","direct",true);
channel.queueBind("myQueue","myExchange","directRoutingKey");
String message = "direct message test";
channel.basicPublish("myExchange","directRoutingKey",null,message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myQueue",true,false,false,null);
channel.exchangeDeclare("myExchange","direct",true);
channel.queueBind("myQueue","myExchange","directRoutingKey");
channel.basicConsume("myQueue",true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消息"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
fanout交换机
public class App {
public static void main( String[] args ) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("fanoutExchange","fanout",true);
String message = "direct message test";
channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class App1 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.exchangeDeclare("fanoutExchange","fanout",true);
channel.queueBind(queueName,"fanoutExchange","");
channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消息"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
topic交换机
public class App {
public static void main( String[] args ) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("topicExchange","topic",true);
String message = "message test";
channel.basicPublish("topicExchange","aa.123",null,message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class App1 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("topicQueue",true,false,false,null);
channel.exchangeDeclare("topicExchange","topic",true);
channel.queueBind("topicQueue","topicExchange","aa.*");
channel.basicConsume("topicQueue",true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消息"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
事务性消息
channel.txSelect();
channel.txCommit();
channel.txRollback();
public class App {
public static void main( String[] args ) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("fanoutExchange","fanout",true);
String message = "message test";
channel.txSelect();
channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));
channel.txCommit();
} catch (IOException e) {
e.printStackTrace();
try {
channel.txRollback();
} catch (IOException ex) {
ex.printStackTrace();
}
} catch (TimeoutException e) {
e.printStackTrace();
try {
channel.txRollback();
} catch (IOException ex) {
ex.printStackTrace();
}
}finally {
if(channel != null){
try {
channel.txRollback();
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
发送者确认模式
package com.java;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class App {
public static void main( String[] args ) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("fanoutExchange","fanout",true);
channel.confirmSelect();
String message = "message test";
channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));
channel.waitForConfirmsOrDie();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class App {
public static void main( String[] args ) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("fanoutExchange","fanout",true);
channel.confirmSelect();
String message = "message test";
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
}
@Override
public void handleNack(long l, boolean b) throws IOException {
}
});
channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class App1 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.73.132");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.exchangeDeclare("fanoutExchange","fanout",true);
channel.queueBind(queueName,"fanoutExchange","");
channel.basicConsume(queueName,false,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消息"+message);
long num = envelope.getDeliveryTag();
Channel c = super.getChannel();
c.basicAck(num,true);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
SpringBoot集成RabbitMQ
spring.rabbitmq.host=192.168.73.132
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123
@Configuration
public class RabbitMQConfig {
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
@Bean
public Queue directQueue(){
return new Queue("directQueue");
}
@Bean
public Binding binding(Queue directQueue,DirectExchange directExchange){
return BindingBuilder.bind(directQueue).to(directExchange).with("directRouting");
}
}
@Controller
public class app {
@Resource
private AmqpTemplate amqpTemplate;
@RequestMapping(value = "/send")
@ResponseBody
public void rabbitMQMessage(){
amqpTemplate.convertAndSend("directExchange","directRouting","Message test");
}
}
@Component
public class app {
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(queues = {"directQueue"})
public void rabbitMQMessage(String message){
System.out.println(message);
}
}
@Component
public class app {
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(),
exchange = @Exchange(name = "fanoutExchange",type = "fanout"))
})
public void rabbitMQMessage(String message){
System.out.println(message);
}
}
@Configuration
public class RabbitMQConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
}
@Controller
public class RabbitMQConfig {
@Resource
private AmqpTemplate amqpTemplate;
@RequestMapping(value = "/send")
@ResponseBody
public void sendMessage(){
amqpTemplate.convertAndSend("fanoutExchange","message test");
}
}
@Component
public class app {
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("topic"),
key = {"aa.#"},
exchange = @Exchange(name = "topicExchange",type = "topic"))
})
public void rabbitMQMessage(String message){
System.out.println(message);
}
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Controller
public class RabbitMQConfig {
@Resource
private AmqpTemplate amqpTemplate;
@RequestMapping(value = "/send")
@ResponseBody
public void sendMessage(){
amqpTemplate.convertAndSend("topicExchange","message test");
}
}
RabbitMQ集群
第一台机器
127.0.0.1 A localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 A localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.222.129 A
192.168.222.130 B
第二台机器
127.0.0.1 B localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 B localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.222.129 A
192.168.222.130 B
命令:cat /var/lib/rabbitmq/.erlang.cookie
会获取一串密钥CGCMOJQVMLUNUCQNSLFD
命令:scp /var/lib/rabbitmq/.erlang.cookie 192.168.222.130:/var/lib/rabbitmq
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@B
rabbitmqctl start_app
SpringBoot链接集群
spring.rabbitmq.addresses=192.168.73.132:5672,192.168.73.133:5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123