0
点赞
收藏
分享

微信扫一扫

【ActiveMQ入门-4】ActiveMQ学习-异步接收

总体说明:

1. 一个生产者/发布者:可以向多个目的地发送消息;

2. 每个目的地(destination)可以有多个订阅者或消费者;

如下图所示:

【ActiveMQ入门-4】ActiveMQ学习-异步接收_apache




程序结构:

1. Publisher.java  :创建1个生产者和4个主题,遍历4个主题,生产者依次向4个主题中发送Message,共发送5次;

2.Consumer.java :消费者,创建8个消费者,每两个消费者订阅一个相同的主题,采用异步接收方式;

3. Listener.java   :异步监听



运行结果:

生产者:

【ActiveMQ入门-4】ActiveMQ学习-异步接收_ActiveMQ_02

消费者:

【ActiveMQ入门-4】ActiveMQ学习-异步接收_java_03



程序代码:

Publisher.java

package com.ll.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

public class Publisher {

private ConnectionFactory factory;
private Connection connection = null;
private Session session;
private String brokerURL = "tcp://localhost:61616";
private MessageProducer producer;
private Destination[] destinations;

/**
* 构造函数 创建连接、创建生产者
*
* @throws JMSException
*/
public Publisher() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}

/**
* 设置目的地
*
* @param stocks
* :主题名列表
* @throws JMSException
*/
protected void setTopics(String[] stocks) throws JMSException {
destinations = new Destination[stocks.length];
for (int i = 0; i < stocks.length; i++) {
destinations[i] = session.createTopic("STOCKS." + stocks[i]);
}
}

/**
* 创建消息
*
* @param stock
* :主题名
* @param session
* @return
* @throws JMSException
*/
protected Message createStockMessage(String stock, Session session)
throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("stock", stock);
message.setDouble("price", 1.00);
message.setDouble("offer", 0.01);
message.setBoolean("up", true);

return message;
}

/**
* 发送消息
* 遍历所有主题(目的地),向每个目的地分别发送一个MapMessage
* @param stocks
* :主题名
* @throws JMSException
*/
protected void sendMessage(String[] stocks) throws JMSException {
//遍历所有主题
for (int i = 0; i < stocks.length; i++) {
// 创建消息
Message message = createStockMessage(stocks[i], session);

System.out.println("发送: "
+ ((ActiveMQMapMessage) message).getContentMap()
+ " on destination: " + destinations[i]);

// 往目的地发送消息
producer.send(destinations[i], message);
}
}

public void close() throws JMSException {
try {
if (null != connection)
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] argss) throws JMSException {

String[] topics = { "MyTopic1", "MyTopic2", "MyTopic3", "MyTopic4" };

Publisher publisher = new Publisher();

publisher.setTopics(topics);

//每隔1s发送一次消息,共发送5次消息
for (int i = 0; i < 5; i++) {
System.out.println("发布者第:" + i + " 次发布消息...");
publisher.sendMessage(topics);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
publisher.close();
}
}

Consumer.java

package com.ll.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

private ActiveMQConnectionFactory factory;
private String brokerURL = "tcp://localhost:61616";
private Connection connection = null;
private Session session;

public Consumer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

public Session getSession() {
return session;
}

public static void main(String[] args) throws JMSException {
String[] topics = { "MyTopic1", "MyTopic2", "MyTopic3", "MyTopic4" };

Consumer consumer = new Consumer();

for (String stock : topics) {
//创建目的地
Destination destination = consumer.getSession().createTopic(
"STOCKS." + stock);

//创建消费者
MessageConsumer messageConsumer = consumer.getSession()
.createConsumer(destination);
MessageConsumer messageConsumer2 = consumer.getSession()
.createConsumer(destination);

//设置监听器
messageConsumer.setMessageListener(new Listener());
messageConsumer2.setMessageListener(new Listener());
}
}

}

Listener.java

package com.ll.activemq;

import java.text.DecimalFormat;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

public class Listener implements MessageListener {

/**
* 异步接收
* 当有消息时,就会触发该事件
*/
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage)message;
String stock = map.getString("stock");
double price = map.getDouble("price");
double offer = map.getDouble("offer");
boolean up = map.getBoolean("up");

DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );
System.out.println("接收消息:"+stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));
} catch (Exception e) {
e.printStackTrace();
}
}

}



延伸:

若是将上述程序中的 createTopic全部替换为createQueue,则运行结果如下:

生产者输出不变,这里就不截图了。

下面是消费者消费情况截图:

【ActiveMQ入门-4】ActiveMQ学习-异步接收_apache_04  



举报

相关推荐

0 条评论