链路压测是一种常见的压测手段,可以测试出系统,链路的性能瓶颈在哪。大公司基本都有根据自己的业务开发的整套链路压测的产品。但是基本没有开源出来,技术细节都是没有的,只是有文章介绍他们的场景和解决方案。本人最近也参与了一个链路压测的项目,把这个项目中的遇到的一些问题和解决写出来,希望给到有需要的人,技术方案并不复杂。
链路压测的操作方式有2中
这里只讨论第二种方式涉及到的问题
说到mq,消息的标记识别需要在发送端跟消费端做处理,也分2种方式
这里针对的都是注解方式使用的rabbitmq,不包括那些硬编码使用mq发送接收的
消息接收
@RabbitListener(queues = MqConstant.QUEUE)
public final void onMessage(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
System.out.println("是否是压测消息: " + HeaderThreadLocal.isTestRequest());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
消息发送
public void sendMq(){
rabbitTemplate.convertAndSend(MqConstant.EXCHANGE, MqConstant.ROUT_KEY, "message-body-" + UUID.randomUUID().toString());
}
1 先来说说第一种
消息的标记识别实际上就是对发送接收做一个拦截处理,配置2个bean,在bean 的方法里面做拦截的逻辑就可以了
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//消息接收之前加拦截处理,每次接收消息都会调用,是有压测消息标记的,先存到副本变量,后续的操作数据库根据这个变量进行切换影子库
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map header = message.getMessageProperties().getHeaders();
//判断是哪个队列的消息,影子队列的话要动态切换影子库跟后续操作
String queue = message.getMessageProperties().getConsumerQueue();
if (header.containsKey("test")){
HeaderThreadLocal.setIsTestRequest(true);
}
return message;
}
});
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
//发送之前加一个拦截器,每次发送都会调用这个方法,方法名称已经说明了一切了
rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
if (HeaderThreadLocal.isTestRequest()) {
//拦截逻辑就是如果是压测请求就加个header标记
message.getMessageProperties().getHeaders().put("test", true);
}
return message;
}
});
return rabbitTemplate;
}
2 接下来说说第二种
第二种也很简单,操作如下
使用一个自定义的RabbitTemplate,重写里面的convertAndSend方法,每次调用都判断,是否要切换routing-key
public class CustomRabbitTemplate extends RabbitTemplate {
public CustomRabbitTemplate(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
@Override
public void convertAndSend(String exchange,
String routingKey,
final Object object) throws AmqpException {
if (HeaderThreadLocal.isTestRequest()){
routingKey = routingKey +"-shadow";
}
super.convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
CustomRabbitTemplate rabbitTemplate = new CustomRabbitTemplate(factory);
return rabbitTemplate;
}
这一步参考这个bean 的配置 SimpleRabbitListenerContainerFactory
找到这个bean RabbitListenerAnnotationBeanPostProcessor 里面的这个方法 resolveQueues
private String[] resolveQueues(RabbitListener rabbitListener) {
String[] queues = rabbitListener.queues();
//修改这里面的逻辑,加上这2行代码
String oldQueue = queues[0];
queues = new String[]{oldQueue, oldQueue+"-shadow"};
QueueBinding[] bindings = rabbitListener.bindings();
return result.toArray(new String[result.size()]);
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
//这个类 CustomRabbitListenerAnnotationBeanPostProcessor 的代码时全部复制
//RabbitListenerAnnotationBeanPostProcessor的,只是加了2行代码
return new CustomRabbitListenerAnnotationBeanPostProcessor();
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
这样就可以自动加上影子队列的监听了,一顿操作下来,可以进行测试了,消息是可以动态指定发到对应的队列的