0
点赞
收藏
分享

微信扫一扫

rabbitmq消息标记拦截处理

树下的老石头 2021-09-28 阅读 112

链路压测是一种常见的压测手段,可以测试出系统,链路的性能瓶颈在哪。大公司基本都有根据自己的业务开发的整套链路压测的产品。但是基本没有开源出来,技术细节都是没有的,只是有文章介绍他们的场景和解决方案。本人最近也参与了一个链路压测的项目,把这个项目中的遇到的一些问题和解决写出来,希望给到有需要的人,技术方案并不复杂。

链路压测的操作方式有2中

这里只讨论第二种方式涉及到的问题

说到mq,消息的标记识别需要在发送端跟消费端做处理,也分2种方式

这里针对的都是注解方式使用的rabbitmq,不包括那些硬编码使用mq发送接收的
消息接收
@RabbitListener(queues = MqConstant.QUEUE)
public final void onMessage(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        System.out.println("是否是压测消息: " + HeaderThreadLocal.isTestRequest());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
消息发送
public void sendMq(){
        rabbitTemplate.convertAndSend(MqConstant.EXCHANGE, MqConstant.ROUT_KEY, "message-body-" + UUID.randomUUID().toString());
}

1 先来说说第一种

消息的标记识别实际上就是对发送接收做一个拦截处理,配置2个bean,在bean 的方法里面做拦截的逻辑就可以了

    @Bean(name = "rabbitListenerContainerFactory")
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                                     ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //消息接收之前加拦截处理,每次接收消息都会调用,是有压测消息标记的,先存到副本变量,后续的操作数据库根据这个变量进行切换影子库
        factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                Map header = message.getMessageProperties().getHeaders();
                //判断是哪个队列的消息,影子队列的话要动态切换影子库跟后续操作
                String queue = message.getMessageProperties().getConsumerQueue();
                if (header.containsKey("test")){
                    HeaderThreadLocal.setIsTestRequest(true);
                }
                return message;
            }
        });
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        //发送之前加一个拦截器,每次发送都会调用这个方法,方法名称已经说明了一切了
        rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                if (HeaderThreadLocal.isTestRequest()) {
                    //拦截逻辑就是如果是压测请求就加个header标记
                    message.getMessageProperties().getHeaders().put("test", true);
                }
                return message;
            }
        });
        return rabbitTemplate;
    }

2 接下来说说第二种

第二种也很简单,操作如下

使用一个自定义的RabbitTemplate,重写里面的convertAndSend方法,每次调用都判断,是否要切换routing-key
public class CustomRabbitTemplate extends RabbitTemplate {
    public CustomRabbitTemplate(ConnectionFactory connectionFactory) {
        super(connectionFactory);
    }
    @Override
    public void convertAndSend(String exchange,
                               String routingKey,
                               final Object object) throws AmqpException {
        if (HeaderThreadLocal.isTestRequest()){
            routingKey = routingKey +"-shadow";
        }
        super.convertAndSend(exchange, routingKey, object, (CorrelationData) null);
    }
}

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
        CustomRabbitTemplate rabbitTemplate = new CustomRabbitTemplate(factory);
        return rabbitTemplate;
    }
这一步参考这个bean 的配置 SimpleRabbitListenerContainerFactory

找到这个bean RabbitListenerAnnotationBeanPostProcessor 里面的这个方法 resolveQueues

private String[] resolveQueues(RabbitListener rabbitListener) {
        String[] queues = rabbitListener.queues();
        //修改这里面的逻辑,加上这2行代码
        String oldQueue = queues[0];
        queues = new String[]{oldQueue, oldQueue+"-shadow"};
        QueueBinding[] bindings = rabbitListener.bindings();
        return result.toArray(new String[result.size()]);
}
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
        //这个类 CustomRabbitListenerAnnotationBeanPostProcessor 的代码时全部复制
        //RabbitListenerAnnotationBeanPostProcessor的,只是加了2行代码
        return new CustomRabbitListenerAnnotationBeanPostProcessor();
    }

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

这样就可以自动加上影子队列的监听了,一顿操作下来,可以进行测试了,消息是可以动态指定发到对应的队列的

举报

相关推荐

0 条评论