0
点赞
收藏
分享

微信扫一扫

springboot项目中使用rabbitmq


第一步:安装RabbitMq

Rabbit安装
入门安装

后台管理
权限管理
解决用户无法远程登录
日志文件异常

插件问题(关闭后无法启动)

cd /var/lib/rabbitmq/mnesia/
#删除一下文件
rm -rf rabbit@cluster
rm -rf rabbit@cluster.pid

添加系统环境:

PATH=$PATH:/usr/lib/rabbitmq/bin

添加管理模块

# 添加web管理插件
rabbitmq-plugins enable rabbitmq_management

常见命令:

启动服务:rabbitmq-server -detached【 /usr/local/rabbitmq/sbin/rabbitmq-server  -detached 】
如果rabbitmq-server 启动命令,该命令ctrl+c后会关闭服务
查看状态:rabbitmqctl status【 /usr/local/rabbitmq/sbin/rabbitmqctl status 】
关闭服务:rabbitmqctl stop【 /usr/local/rabbitmq/sbin/rabbitmqctl stop 】
列出角色:rabbitmqctl list_users

验证:
页面:http://127.0.0.1:15672/
默认账号和密码:guest/guest

springboot项目中使用rabbitmq_rabbitmq

第二步:在springboot中简单地使用

依赖

<!--rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.43</version>
</dependency>

application.properties

##配置rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#自定义参数:并发消费者的初始化值
spring.rabbitmq.listener.concurrency=10
#自定义参数:并发消费者的最大值
spring.rabbitmq.listener.max-concurrency=20
#自定义参数:每个消费者每次监听时可拉取处理的消息数量
spring.rabbitmq.listener.prefetch=5

#字符串型的消息
string.queue.name=string.queue.name
string.exchange.name =string.exchange.name
string.routing.key.name=string.routing.key.name
#object型的消息
object.queue.name=object.queue.name
object.exchange.topic.name =object.exchange.topic.name
object.routing.key.topic.name=*.topic.*

配置类:RabbitmqConfig

/**
* rabbitmq配置类
*/
@Configuration
public class RabbitmqConfig {


@Autowired
private Environment env;

@Autowired
private CachingConnectionFactory connectionFactory;

@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

//日志记录器
private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);

/**
* 单一消费者,即为队列消息
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}

/**
* 多个消费者,即主题消息
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);

//关于并发配置
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
return factory;
}

/**
*RabbitTemplate工具类
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//默认是开启应答成功后的回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}

/**生成rabbitmq组件
* 队列:用来存储消息的数据结构,位于硬盘或内存中。
* 交换机:接收发送到RabbitMQ中的消息并决定把他们投递到那个队列的组件;
* 绑定:一套规则,用于告诉交换器消息应该被存储到哪个队列。
*
* **/

/**字符型消息**/
//定义队列
@Bean(name ="stringQueue")
public Queue stringQueue(){
return new Queue(env.getProperty("string.queue.name"),true);
}
//定义交换机 DirectExchange为队列消息交换器
@Bean
public DirectExchange stringExchange(){
return new DirectExchange(env.getProperty("string.exchange.name"),true,false);
}
//定义绑定
@Bean
public Binding stringBinging(){
return BindingBuilder.bind(stringQueue()).to(stringExchange()).with(env.getProperty("string.routing.key.name"));
}


/**object型消息**/
//定义队列
@Bean(name ="objectQueue")
public Queue objectQueue(){
return new Queue(env.getProperty("object.queue.name"),true);
}
//定义交换机 TopicExchange为主题消息交换器
@Bean
public TopicExchange objectExchange(){
return new TopicExchange(env.getProperty("object.exchange.topic.name"),true,false);
}
//定义绑定
//主题消息指出以正则表达式的方式进行绑定,这里的模糊匹配值的是:exchange的name满足bind的规则
//在主题消息中一个队列可以同时发送给不同消费者 即一个队列可以满足多个交换器的绑定规则
@Bean
public Binding objectBinging(){
return BindingBuilder.bind(objectQueue()).to(objectExchange()).with( env.getProperty("object.routing.key.topic.name"));
}

/*
@Bean
public Binding objectBinging2(){
return BindingBuilder.bind(objectQueue()).to(objectExchange()).with("com.#");
}*/

}

生产者:RabbitMqController

/**
* 这是测试模块的Controller
*/
@RestController
@RequestMapping("/attendance/rabbitmq")
@Api(value = "测试管理")
public class RabbitMqController {


@Autowired
RabbitTemplate rabbitTemplate;


@Autowired
private Environment env;


//日志记录器
private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);

@PostMapping("/messageString")
@ApiOperation(value = "字符串型消息生产者")
public ResponseResult messageString(HttpServletRequest request) {
logger.info("start RabbitMqController.messageString");
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("string.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("string.routing.key.name"));
try {
Message message = MessageBuilder.withBody("发送信息".getBytes("UTF-8")).build();
//设置请求编码格式
message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);
rabbitTemplate.convertAndSend(message);
}catch (Exception e){
logger.error("字符串型消息生产者发送消息失败"+e.getMessage(),e);
}
logger.info("end RabbitMqController.messageString");
return ResponseResult.success( ConstantsUtil.OPERATE_SUCCESS);
}


@PostMapping("/messageObject")
@ApiOperation(value = "object型消息生产者")
public ResponseResult messageObject(HttpServletRequest request) {
logger.info("start RabbitMqController.messageObject");
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("object.exchange.topic.name"));
rabbitTemplate.setRoutingKey(env.getProperty("object.routing.key.topic.name"));


UserInfoPojo userInfoPojo =new UserInfoPojo();
userInfoPojo.setUserName("luo");
userInfoPojo.setPassword("123");
try {
Message message = MessageBuilder.withBody(JacksonUtils.toJson(userInfoPojo).getBytes("UTF-8")).build();
//设置请求编码格式
message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);
rabbitTemplate.convertAndSend(message);
}catch (Exception e){
logger.error("object型消息生产者发送消息失败"+e.getMessage(),e);
}
logger.info("end RabbitMqController.messageObject");
return ResponseResult.success( ConstantsUtil.OPERATE_SUCCESS);
}
}

消费者:CommonMqListener

/**
* 这里是消息队列的消费者
*/


@Component
public class CommonMqListener {

private static ObjectMapper objectMapper = new ObjectMapper();

//日志记录器
private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
/**
* 监听消费消息
* @param message
*/
@RabbitListener(queues = "${string.queue.name}",containerFactory = "singleListenerContainer")
public void consumeStringQueue(@Payload byte[] message) {
try {
logger.info("监听消费 监听到消息: {} ", new String(message,"UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 监听消费消息
* @param message
*/
@RabbitListener(queues = "${object.queue.name}",containerFactory = "multiListenerContainer")
public void consumeObjectQueue(@Payload byte[] message) {
try {
UserInfoPojo userInfoPojo = objectMapper.readValue(message,UserInfoPojo.class);
logger.info("监听消费 监听到消息: {} ", userInfoPojo.toString());
} catch (Exception e) {
e.printStackTrace();
}
}


}

验证结果:

springboot项目中使用rabbitmq_字符串_02


springboot项目中使用rabbitmq_java_03

springboot项目中使用rabbitmq_队列_04


举报

相关推荐

0 条评论