0
点赞
收藏
分享

微信扫一扫

封装SpringBootRabbitMq

前言

消息中间件可以解决什么应用场景

  • 1、异步处理(缓存处理)
  • 2、应用解耦
  • 3、处理高并发(一般在秒杀或团抢活动中使用广泛)
    应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列
    1. 可以控制活动的人数;
    1. 可以缓解短时间内高流量压垮应用;
  • 4、分布式事物一致性

中间件产品选型

RabbitMQ系统架构

1.几个概念说明:

  • Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
  • Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息的载体,每个消息都会被投到一个或多个队列。
  • Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.

  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

  • vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。

  • Producer:消息生产者,就是投递消息的程序.

  • Consumer:消息消费者,就是接受消息的程序.

  • Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。

  • Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。

  • routing key:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes

  • binding key:在绑定(Binding)Exchange与Queue的同时一般会指定一个bindingkey;消费者将消息发送给Exchange时,一般会指定一个routingkey;当bindingkey与routingkey相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。binding key并不是在所有情况下都生效,它依赖于ExchangeType,比如fanout类型的Exchange就会无视bindingkey,而是将消息路由到所有绑定到该Exchange的Queue。

  • headers:类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
    在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

2.任务分发机制

2.1Round-robin dispathching循环分发
2.2Message acknowledgment消息确认

3.Message durability消息持久化

  1. exchange持久化,在声明时指定durable => true
  2. queue持久化,在声明时指定durable => true
  3. 消息持久化,在投递时指定delivery_mode => 2(1是非持久化).

4.Fair dispath 公平分发

注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

5.分发到多个Consumer

5.1Exchange

交换机路由的几种类型:

  • Direct Exchange:直接匹配,通过Exchange名称+RountingKey来发送与接收消息,是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列.
  • Fanout Exchange:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key.
  • Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;
  • Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
  • 默认的exchange:如果用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去
5.2 Bindings 绑定
5.3Direct exchange
5.4Fanout Exchange
5.5Topic Exchange
  1. 这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(routingKey),Exchange会将消息转发到所有关注主题能与routingKey模糊匹配的队列。
  2. 这种模式需要routingKey,也许要提前绑定Exchange与Queue。
  3. 在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
  4. “#”表示0个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
  5. 同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
    [图片上传失败...(image-c534a0-1527837390951)]

6.消息序列化

  1. 简单
  2. size小了3-10倍
  3. 速度快了20-100倍
  4. 易于编程
  5. 减少了语义的歧义.
    ProtoBuf具有速度和空间的优势,使得它现在应用非常广泛

RabbitMQ安装

docker pull rabbitmq

7.blf-rabbitmq-core

<dependency>
   <groupId>com.belle.blf</groupId>
   <artifactId>blf-rabbitmq-core</artifactId>
</dependency>

Configuration

@Configuration
//@EnableConfigurationProperties(RabbitGeneralProperties.class)
@ConditionalOnBean(annotation = EnableDirect.class)
public class RabbitConfig {

    @Autowired
    ConnectionFactory connectionFactory;
    //@Autowired
    //RabbitGeneralProperties rabbitGeneralProperties;

    @Value("${rabbit.general.direct.queue}")
    private String directQueue;

    /*@Bean
    public Queue queueGeneral(){
        return  new Queue(RabbitConstants.QUEUE_DIRECT_GENERAL);
    }*/


    /**
     * 动态生成队列
     * @return
     * @throws AmqpException
     * @throws IOException
     */
    @Bean
    public String[] mqMsgQueues() throws AmqpException ,IOException{

        if(StringUtils.isEmpty(directQueue)){
            directQueue=RabbitConstants.QUEUE_DIRECT_GENERAL;
        }
        String[] queueNames =directQueue.split(",");
        for(int i=0;i<queueNames.length;i++){
            connectionFactory.createConnection().createChannel(false).queueDeclare(queueNames[i], true, false, false, null);
        }
        return queueNames;
    }


}

/**
 * @description: 广播发送
 * @author: tan.bin
 * @create: 2018-05-30 17:54
 **/
@Configuration
@ConditionalOnBean(annotation = EnableFanout.class)
public class FanoutRabbitConfig {


    @Autowired
    ConnectionFactory connectionFactory;
    /**
     * exchange queue 多对多关系 一个交换下可以绑定多个队列 一个队列可以被多个交换机绑定
     */

    @Value("${rabbit.general.fanout.exchange}")
    private String fanoutExchange;


    @Value("${rabbit.general.fanout.queue}")
    private String fanoutQueue;


    /**
     * 声明交换机
     * @return
     * @throws AmqpException
     * @throws IOException
     */
    @Primary
    @Bean
    public String mqMsgFanoutExchange() throws AmqpException,IOException {

        if(StringUtils.isEmpty(fanoutExchange)){
            fanoutQueue=RabbitConstants.FANOUT_EXCHANGE_GENERAL;
        }
        String exchangeNames =fanoutExchange;
        //创建队列
        connectionFactory.createConnection().createChannel(false).exchangeDeclare(fanoutExchange,BuiltinExchangeType.FANOUT,true);

        return fanoutExchange;
    }

    /**
     * 队列声明、与绑定与交换机关系
     * @return
     * @throws AmqpException
     * @throws IOException
     */
    @Bean
    public String[] mqMsgFanoutQueues() throws AmqpException,IOException {

        if(StringUtils.isEmpty(fanoutQueue)){
            fanoutQueue=RabbitConstants.FANOUT_EXCHANGE_GENERAL;
        }
        String[] queueNames =fanoutQueue.split(",");
        for(int i=0;i<queueNames.length;i++){
            //创建队列
            connectionFactory.createConnection().createChannel(false).queueDeclare(queueNames[i], true, false, false, null);
            //绑定交换机与队列
            connectionFactory.createConnection().createChannel(false).queueBind(queueNames[i],fanoutExchange,"");
        }
        return queueNames;
    }
}

@Configuration
@ConditionalOnBean(annotation = EnableTopic.class)
public class TopicRabbitConfig {


    private final Logger logger = LoggerFactory.getLogger(TopicRabbitConfig.class);

    @Autowired
    ConnectionFactory connectionFactory;
    /**
     * exchange queue 多对多关系 一个交换下可以绑定多个队列 一个队列可以被多个交换机绑定
     */

    @Value("${rabbit.general.topic.exchange}")
    private String topicExchange;


    @Value("${rabbit.general.topic.queue}")
    private String topicQueue;


    //主题关键字
    @Value("${rabbit.general.topic.routingKey}")
    private String routingKey;

    /**
     * 声明交换机
     * @return
     * @throws AmqpException
     * @throws IOException
     */
    @Primary
    @Bean
    public String mqMsgTopicExchange() throws AmqpException,IOException {

        if(StringUtils.isEmpty(topicExchange)){
            topicExchange=RabbitConstants.TOPIC_EXCHANGE_GENERAL;
        }
        String exchangeNames =topicExchange;
        //声明交换机
        connectionFactory.createConnection().createChannel(false).exchangeDeclare(topicExchange,BuiltinExchangeType.TOPIC,true);

        return topicExchange;
    }

    /**
     * 队列声明、与绑定与交换机关系
     * @return
     * @throws AmqpException
     * @throws IOException
     */
    @Bean
    public String[] mqMsgTopicQueues() throws AmqpException,IOException {

        if(StringUtils.isEmpty(topicQueue)){
            topicQueue=RabbitConstants.FANOUT_EXCHANGE_GENERAL;
        }
        String[] queueNames =topicQueue.split(",");
        String[] routingKeyName=routingKey.split(",");//与队列是一一对应
        for(int i=0;i<queueNames.length;i++){
            logger.info("queueNames:{}==routingKey:{}",queueNames[i],routingKeyName[i]);
            //创建队列
            connectionFactory.createConnection().createChannel(false).queueDeclare(queueNames[i], true, false, false, null);
            //绑定交换机与队列
            connectionFactory.createConnection().createChannel(false).queueBind(queueNames[i],topicExchange,routingKeyName[i]);
        }
        return queueNames;
    }
}
选择发送消息对方式
@SpringBootApplication
@EnableFanout
@EnableDirect
@EnableTopic
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

7.1消息三种方式

spring:
  rabbitmq:
    host: 172.20.32.126
    port: 5672
    username: guest
    password: guest



rabbit:
  general:
    direct:
      queue: text_queue #队列名称
    fanout:                               #需要绑定交换机与队列关系
      exchange: fanout_exchange_general   #交换机名称
      queue:  queue_fanout_general,queue_fanout_general1 #队列名称
    topic:                               #需要绑定交换机与队列关系还需要设置ROUTE_KEY
      exchange: topic_exchange_general
      queue:  queue_topic_general,queue_topic_general1  #一对一关系 顺序队列
      routingKey: blf.#,itg.uc
7.1.1Direct
 > producer

@RunWith(SpringRunner.class) // SpringJUnit支持,由此引入Spring-Test框架支持!
@SpringBootTest(classes = Application.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ApplicationTest {

    @Autowired
    RabbitmqSend rabbitmqSend;
    @Test
    public void doTest(){

        //发送10条件消息
        for(int i=0;i<10;i++){
            rabbitmqSend.sendDirect(RabbitConstants.QUEUE_GENERAL,("hello==="+i).toString());
        }

    }
}

> consumer

@Component
@RabbitListener(queues = RabbitConstants.QUEUE_GENERAL)
public class HelloReceiverA {

    @RabbitHandler
    public void process(String context) {
        System.out.println("客户端监听2:"+context);
    }
}

@Component
@RabbitListener(queues = RabbitConstants.QUEUE_GENERAL)
public class HelloReceiverB {

    @RabbitHandler
    public void process(String context) {
        System.out.println("客户端监听1:"+context);
    }
}

> 结果输出
客户端监听2:hello2
客户端监听2:hello3
客户端监听1:hello4
客户端监听2:hello5
客户端监听1:hello6
客户端监听2:hello7
客户端监听1:hello8
客户端监听2:hello9
7.1.2Fanout
 > producer

public void doFanoutTest(){

    //发送10条件消息
    for(int i=0;i<10;i++){
        System.out.println("消息发送:"+("hello==="+i).toString());
        rabbitmqSend.sendFanoutMessage(RabbitConstants.FANOUT_EXCHANGE_GENERAL,("hello==="+i).toString());
    }

}

> consumer

@Component
@RabbitListener(queues = RabbitConstants.QUEUE_FANOUT_GENERAL)
public class HelloFanoutReceiverA {

    @RabbitHandler
    public void process(String context) {
        System.out.println("Fanout客户端监听2:"+context);
    }
}
@RabbitListener(queues = "queue_fanout_general1")
public class HelloFanoutReceiverB {

    @RabbitHandler
    public void process(String context) {
        System.out.println("Fanout客户端监听2:"+context);
    }
}
Fanout客户端监听2:hello===0
Fanout客户端监听1:hello===0
Fanout客户端监听1:hello===1
Fanout客户端监听2:hello===1
Fanout客户端监听1:hello===2
Fanout客户端监听2:hello===2
Fanout客户端监听1:hello===3
Fanout客户端监听2:hello===3
Fanout客户端监听2:hello===4
Fanout客户端监听1:hello===4
Fanout客户端监听2:hello===5
Fanout客户端监听1:hello===5
Fanout客户端监听2:hello===6
Fanout客户端监听1:hello===6
Fanout客户端监听2:hello===7
Fanout客户端监听1:hello===7
Fanout客户端监听2:hello===8
Fanout客户端监听1:hello===8
Fanout客户端监听2:hello===9
Fanout客户端监听1:hello===9
7.1.2Topic
 > producer
//发送10条件消息
for(int i=0;i<10;i++){
    System.out.println("消息发送:"+("hello==="+i).toString());
    if(i<5) {
        rabbitmqSend.senTopicMessage(RabbitConstants.TOPIC_EXCHANGE_GENERAL, "itg.uc", ("hello===" + i).toString());
    }else {
        rabbitmqSend.senTopicMessage(RabbitConstants.TOPIC_EXCHANGE_GENERAL, "blf.uc", ("hello===" + i).toString());

    }
}

> consumer

@Component
@RabbitListener(queues = RabbitConstants.QUEUE_TOPIC_GENERAL)
public class HelloTopReceiverA {

    @RabbitHandler
    public void process(String context) {
        System.out.println("top客户端监听2:"+context);
    }
}

@Component
@RabbitListener(queues = "queue_topic_general1")
public class HelloTopReceiverB {

    @RabbitHandler
    public void process(String context) {
        System.out.println("top客户端监听1:"+context);
    }
}

top客户端监听1:hello===1
top客户端监听2:hello===6
top客户端监听1:hello===2
top客户端监听2:hello===7
top客户端监听1:hello===3
top客户端监听2:hello===8
top客户端监听1:hello===4
top客户端监听2:hello===9
举报

相关推荐

0 条评论