0
点赞
收藏
分享

微信扫一扫

SpringAMQP使用心得

ffLNSpringAMQP 
AMQP 
Advanced Message >Queuing Protocol, 
spring AMQP 
spring

 

1.Basic Queue 简单队列模型

案例实现

2. 
3. 
ERIE* SIA spring-amqpfifiil 
YEsimple.queuei*Tßk51J

 

SlÄAMQPfiÜ 
<dependency> 
<groupld.org . springframework . boot</groupld> 
<artifactld>spring-boot- starter

 

spring : 
rabbitmq : 
host: 192.168.150.101 g 
port: 5672 # 
virtual-host: / # 
username: itcast # 
password: 123521 
@SpringBootTest 
public class SpringAmqpTest { 
@Autowired 
private RabbitTempIate rabbitTemp1ate; 
@Test 
public void testSimpteQueue() { 
String queueName = 
"simpte . queue" , 
String message "hello, spring amqp!"; 
rabbitTemp1ate . convertAndSend (queueName , 
message) ;

总结:

{+/ĂRAMQP? 
SpringAMQP2[lfiJkiŽi5A? 
FIJËRabbitTempIateficonvertAndSendfi*

 

KDfisimple.queue 
spring: 
rabbitmq : 
host: 192.168.15€.101 # 
port: 5672 # 
virtual-host: / # 
username: itcast 
password: 123321 
@Component 
public class SpringRabbitListener { 
"simple.queue") 
public void msg) throws InterruptedException { 
: + msg + "l "

总结:

SpringAMQP 如 何 接 收 消 息 ? 
引 入 arnqp 的 starter 依 赖 
配 置 RabbitMQ 地 址 
定 义 类 , 添加.Com/onent注解 
类 中 声 明 方 法 , 添加@RabbitListener注解 , 方 法 参 数 就 时 消 息 
注 意 : 消 息 一 旦 消 费 就 会 从 队 列 删 除 , RabbitMQ 没 有 消 息 回 溯 功 能

 

2.Work Queue 工作队列模型

Work Queue 
Work queue, 
publisher 
consumerl 
queue 
consumer2

案例实现

0 
模 拟 WorkQueue, 实 现 一 个 队 列 绑 定 多 个 消 费 者 
基 本 思 路 如 下 : 
. 在 pub [ isher 服 务 中 定 义 测 试 方 法 , 每 秒 产 生 50 条 消 息 , 发 送 到 simple.queue 
在 consumer 服 务 中 定 义 两 个 消 息 监 听者@ 都 监 Ofisimple.queue 队 列 
2 . 
消 费 者 1 每 秒 处 理 50 条 消 息 , 消 费 者 2 每 秒 处 理 10 条 消 息 
3 ·

出现一个问题,两个消费者的处理速度不同,而消息的分发是平均的,

所以处理慢的一方仍需要处理大量消息造成延时。

解决方案

unpreFetchi*4fA, 
spring : 
rabbitmq: 
host: 192.168.150.101 # 
port: 5672 # 
virtual-host: / # 
username: itcast # 
password: 123321 
listener : 
simple : 
prefetch :

总结

ork 模 型 的 使 用 . 
. 多 个 消 费 者 绑 定 到 一 个 队 列 , 同 一 条 消 息 只 会 被 一 个 
消 费 者 处 理 
通 过 设 置 prefet 〔 h 来 控 制 消 费 者 预 取 的 消 息 数 量

3.发布(Publish)、订阅(Subscribe) 队列模型

( Publish ) iTiN ( Subscribe ) 
• Fanout: 
• Direct: 
• Topic: 
publisher 
exchan 
queuel 
queue2 
consumerl 
consumer2 
consumer3

 

(1)发布订阅 Fanout Exchange

Fanout Exchange 
publisher 
anout 
exchange 
—ENGfiqueue 
queuel 
queue2 
consumerl 
consumer2

案例实现

 

2. 
3. 
publisher 
fflJk0fifanout.queue1*Ofanout.queue2 
itcast. fanout 
exchange 
fanout. queuel 
fanout. queue2 
consumerl 
consumer2

 

 

 

: EconsumerBåFBÉExchanges 
Declarable 
AbstractDec1arabte 
Queues 
4 
AbstractExchange 
HeadersExchange 
DirectExchange 
FanoutExchange 
Binding 
TopicExchange

 

: Econsumerh&FBÉExchanges Queues Binding 
ijQ@Configurationu, Queue*0$ßiE 
@Configuration 
public class FanoutConfig { 
// B*anoutEXChangeÆÆVl 
@Bean 
public FanoutExchange 
return new .fanout") ; 
@Bean 
public Queue 
return new . queuel") ; 
@Bean 
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange 
return BindingBuiLder. bind(fanoutQueue1) . to(fanoutExchange) ;

总结

交 换 机 的 作 用 是 什 么 ? 
接 收 pub 躬 her 发 送 的 消 息 
将 消 息 按 照 规 则 路 由 到 与 之 绑 定 的 队 列 
不 能 缓 存 消 息 , 路 由 失 败 , 消 息 丢 失 
· FanoutE × change 的 会 将 消 息 路 由 到 每 个 绑 定 的 队 列 
声 明 队 列 、 交 换 机 、 绑 定 关 系 的 Bean 是 什 么 ? 
QUeUe 
· Fa noutExchange 
Binding

 

(2)发布订阅 Direct Exchange

Direct Exchange (routes) o 
Keys We Routing Key—Äfibk51J 
red 
publisher 
bindingKey : 
bindingKey: blue 
queuel 
D rect 
exchange 
queue2 
bindingKey: yellow 
bindingKey: 
key : blue 
consumerl 
key: red 
key : yellow 
consumer2 
key red J 
red

 

 

案例实现

1. Queue, RoutingKey 
bindingKey: blue. red 
publisher 
direct. queuel 
itcast. direct 
exchange 
direct . queue2 
bindingKey: yellow, 
consumerl 
consumer2 
red

 

: Econsumerh&FBhExchanges Queue 
1. "IJklfidirect.queueIKldirect.queue2, 
2. Queue, RoutingKey 
= @QueueBinding( 
value = = "direct.queuel"), 
exchange = @Exchange(name = "itcast.direct", 
type 
key {"red", "blue"} 
public void listenDirectQueue1(String 
system. 
("+msg+"l " 
value = "direct.queue2"), 
= @Exchange(name = 
"itcast. direct" , 
exchange 
type 
= ExchangeTypes .DIRECT) , 
= ExchangeTypes.DIRECT), 
key = {"red", "yellow"} 
public void listenDirectQueue2(String msg) { 
system. 
( "+msg+" I

 

@Test 
public void testDirectExchange() { 
String exchangeName = 
" itcast. direct" ; 
String message = 
// RoutingKey, 
rabbitTempIate. convertAndSend (exchangeName , 
"red", message);

 

总结

描 述 下 Direct 交 换 机 与 Fanout 交 换 机 的 差 异 ? 
Fanout 交 换 机 将 消 息 路 由 给 每 一 个 与 之 绑 定 的 队 列 
Direct 交 换 机 根 据 RoutingKey 判 断 路 由 给 哪 个 队 列 
如 果 多 个 队 列 具 有 相 同 的 RoutingKey , 则 与 Fanout 功 能 类 似 
基 于@RabbitListener注 解 声 明 队 列 和 交 换 机 有 哪 些 常 见 注 解 ? 
@Queue 
@Exchange

 

(3)发布订阅 Topic Exchange

TopicExchange5DirectExchange%Å, 
Queue; : 
n 
*: 
publisher 
ng ey: 
queuel 
op c 
exchange 
china . 
china . 
japan. 
news 
weather 
news 
japan .weather 
indingKey: japan. 
queue2 
ndingKey: #.weath 
queue3 
indingKey: #.n 
queue4 
consumerl 
consumer2 
consumer3 
consumer4

 

案例实现

*IJESpringAMQPETopicExchangeäkJæÆ 
2. 
3. 
Queue. RoutingKey 
EconsumerHß*, h%lJkdfitopic.queueIKltopic.queue2 
fiitcast. 
publisher 
bindin Ke 
china. # 
topic. queuel 
itcast. topic 
exchange 
bindingKey: news 
topic. queue2 
consumerl 
consumer2

 

: EconsumerBåFBÉExchanges Queue 
1. h%lJk0fitopic.queue1$ßtopic.queue2, 
2. Queue, RoutingKey 
@Queue(name = "topic. queuel") , 
value = 
"itcast . topic", 
exchange 
type 
key "china. 
public void listenTopicQueue1(String msg) { 
System. out. print In ( : 
( "+msg+"l 
@RabbitListener(bindings = @QueueBinding( 
value = = "topic. queue2" 
exchange = @Exchange(name = "itcast . topic", type 
key news" 
public void listenTopicQueue2(String 
System. out. print tn ( "N*2ÆüßTopicmn: 
t "+msg+"l 
= ExchangeTypes. TOPIC) , 
= ExchangeTypes. TOPIC) ,

 

@Test 
public void testTopicExchange() { 
String exchangeName = "itcast.topic"; 
String message : 
rabbitTemptate . convertAndSend(exchangeName , 
"china. news" , 
message) ;

 

 

 

(4)SpringAMQP-消息转换器

案例及讲解

测 试 发 送 O bj ect 类 型 消 息 
说 明 : 在 SpringAMQPfi 发 送 方 法 中 , 接 收 消 息 的 类 型 是 Obje 也 就 是 说 我 们 可 以 发 送 任 意 对 象 类 型 
的 消 息 , SpringAMQP 会 帮 我 们 序 列 化 为 字 节 后 发 送 。

 

ihB,9: 
fijwn, 
@Bean 
public Queue objectMessageQueue(){ 
return new Queue("object.queue"); 
@Test 
public void testSendMap() throws InterruptedException { 
Map<String, Object> msg : new ; 
msg. "Jack"); 
21); 
rabbitTempLate. convertAndSend ( "object. queue" , 
msg);

 

 

消息转换器

(1)消息发送者

<groupld.com . fasterxml. jackson . datafo 
ackson -dataformat- xm1</ artifactld> 
<version>2.9.10</version> 
< / dependency> 
üfIGpublisherH%ÄBhMessageConverter: 
@Bean 
public MessageConverter 
return new Jackson2JsonMessageConverter();

 

(2)消息接受者

 

<dependency> 
<groupld.com . fasterxml. jackson . 
-datafo artifactld> 
@Bean 
public MessageConverter 
return new Jackson2JsonMessageConverter() ; 
"object. queue") 
public void listenObjectQueue (Map<String, Object> msg) { 
l" + msg + "l

 

总结

SpringAMQP 中 消 息 的 序 列 化 和 反 序 列 化 是 怎 么 实 现 的 ? 
利 用 Messageconverter 实 现 的 默 认 是 」 DK 的 序 列 化 
注 意 发 送 方 与 接 收 方 必 须 使 用 相 同 的 MessageConverter

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

举报

相关推荐

0 条评论