rocketMQ报错Cannot find a @StreamListener matching for message with id: 99a2a40a-fcf9-800a-384d-e3782846c0ed
原因:
RocketMQ整合Spring cloud stream之后代码中写了
@StreamListener(value = OrderEventSink.ORDER_STATUS_CHANGED_EVENT_INPUT, condition = "headers['rocketmq_TAGS']=='ORDER'")
    public void handleOrderStatusChangedEvent(Message<OrderStatusChangedEvent> message) {} 
但是配置文件没有配置
spring:
  cloud:
    stream:
        bindings:
            orderStatusChangedEventInput:
              destination: order_status_updated_event_topic
//以下代码没有配置
        rocketmq:
            bindings:
              OrderStatusChangedEventInput:
                consumer:
                  tags: ORDER 
当消息发送到服务,但是被过滤了,就会包这个错
//org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler#handleRequestMessage这个类中报出来的错
protected Object handleRequestMessage(Message<?> requestMessage) {
        List<DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper> matchingHandlers = this.evaluateExpressions ? this.findMatchingHandlers(requestMessage) : this.handlerMethods;
        if (matchingHandlers.size() == 0) {
            if (this.logger.isWarnEnabled()) {
// 这是报错的地方
                this.logger.warn("Cannot find a @StreamListener matching for message with id: " + requestMessage.getHeaders().getId());
            }
            return null;
        } else if (matchingHandlers.size() <= 1) {
            DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper singleMatchingHandler = (DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper)matchingHandlers.get(0);
            singleMatchingHandler.getStreamListenerMessageHandler().handleMessage(requestMessage);
            return null;
        } else {
            Iterator var3 = matchingHandlers.iterator();
            while(var3.hasNext()) {
                DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper matchingMethod = (DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper)var3.next();
                matchingMethod.getStreamListenerMessageHandler().handleMessage(requestMessage);
            }
            return null;
        }
    } 
解决办法:
1. yml配置文件中配置上下面那个过滤的代码
spring:
   cloud:
     stream:
 //以下代码配置上
         rocketmq:
             bindings:
               OrderStatusChangedEventInput:
                 consumer:
                   tags: ORDER
配置上之后,在RocketMQ的broker上就过滤了,就不会发送到服务器上,消息查看就会出现

消息被发送到服务器上,但是因为配置了head
, condition = "headers['rocketmq_TAGS']=='ORDER'"
找不到监听的就报错
解决方法2:
yml不配置tag,在代码中配置
, condition = "headers['rocketmq_TAGS']=='ORDER'"
然后自己判断
@StreamListener(value = OrderEventSink.ORDER_STATUS_CHANGED_EVENT_INPUT, condition = "headers['rocketmq_TAGS']=='ORDER'")
    public void handleOrderStatusChangedEvent(Message<OrderStatusChangedEvent> message) {
    Object tags = message.getHeaders().get("rocketmq_TAGS");
        if(!tags.equals("ORDER")){
            return;
        }
//下面是自己的业务
}