总体说明:
1. 一个生产者/发布者:可以向多个目的地发送消息;
2. 每个目的地(destination)可以有多个订阅者或消费者;
如下图所示:

程序结构:
1. Publisher.java :创建1个生产者和4个主题,遍历4个主题,生产者依次向4个主题中发送Message,共发送5次;
2.Consumer.java :消费者,创建8个消费者,每两个消费者订阅一个相同的主题,采用异步接收方式;
3. Listener.java :异步监听
运行结果:
生产者:

消费者:

程序代码:
Publisher.java
package com.ll.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
public class Publisher {
  private ConnectionFactory factory;
  private Connection connection = null;
  private Session session;
  private String brokerURL = "tcp://localhost:61616";
  private MessageProducer producer;
  private Destination[] destinations;
  /**
   * 构造函数 创建连接、创建生产者
   * 
   * @throws JMSException
   */
  public Publisher() throws JMSException {
    factory = new ActiveMQConnectionFactory(brokerURL);
    connection = factory.createConnection();
    try {
      connection.start();
    } catch (JMSException jmse) {
      connection.close();
      throw jmse;
    }
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    producer = session.createProducer(null);
  }
  /**
   * 设置目的地
   * 
   * @param stocks
   *            :主题名列表
   * @throws JMSException
   */
  protected void setTopics(String[] stocks) throws JMSException {
    destinations = new Destination[stocks.length];
    for (int i = 0; i < stocks.length; i++) {
      destinations[i] = session.createTopic("STOCKS." + stocks[i]);
    }
  }
  /**
   * 创建消息
   * 
   * @param stock
   *            :主题名
   * @param session
   * @return
   * @throws JMSException
   */
  protected Message createStockMessage(String stock, Session session)
      throws JMSException {
    MapMessage message = session.createMapMessage();
    message.setString("stock", stock);
    message.setDouble("price", 1.00);
    message.setDouble("offer", 0.01);
    message.setBoolean("up", true);
    return message;
  }
  
  /**
   * 发送消息
   * 遍历所有主题(目的地),向每个目的地分别发送一个MapMessage
   * @param stocks
   *            :主题名
   * @throws JMSException
   */
  protected void sendMessage(String[] stocks) throws JMSException {
    //遍历所有主题
    for (int i = 0; i < stocks.length; i++) {
      // 创建消息
      Message message = createStockMessage(stocks[i], session); 
      
      System.out.println("发送: "
          + ((ActiveMQMapMessage) message).getContentMap()
          + " on destination: " + destinations[i]);
      
      // 往目的地发送消息
      producer.send(destinations[i], message); 
    }
  }
  public void close() throws JMSException {
    try {
      if (null != connection)
        connection.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  public static void main(String[] argss) throws JMSException {
    String[] topics = { "MyTopic1", "MyTopic2", "MyTopic3", "MyTopic4" };
    Publisher publisher = new Publisher();
    publisher.setTopics(topics);
    //每隔1s发送一次消息,共发送5次消息
    for (int i = 0; i < 5; i++) {
      System.out.println("发布者第:" + i + " 次发布消息...");
      publisher.sendMessage(topics);
      try {
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    publisher.close();
  }
}Consumer.java
package com.ll.activemq;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
  private ActiveMQConnectionFactory factory;
  private String brokerURL = "tcp://localhost:61616";
  private Connection connection = null;
  private Session session;
  public Consumer() throws JMSException {
    factory = new ActiveMQConnectionFactory(brokerURL);
    connection = factory.createConnection();
    connection.start();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  }
  public Session getSession() {
    return session;
  }
  public static void main(String[] args) throws JMSException {
    String[] topics = { "MyTopic1", "MyTopic2", "MyTopic3", "MyTopic4" };
    Consumer consumer = new Consumer();
    for (String stock : topics) {
      //创建目的地
      Destination destination = consumer.getSession().createTopic(
          "STOCKS." + stock);
      
      //创建消费者
      MessageConsumer messageConsumer = consumer.getSession()
          .createConsumer(destination);
      MessageConsumer messageConsumer2 = consumer.getSession()
          .createConsumer(destination);
      
      //设置监听器
      messageConsumer.setMessageListener(new Listener());
      messageConsumer2.setMessageListener(new Listener());
    }
  }
}Listener.java
package com.ll.activemq;
import java.text.DecimalFormat;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
public class Listener implements MessageListener {  
      
    /**
     * 异步接收
     * 当有消息时,就会触发该事件
     */
    public void onMessage(Message message) {  
        try {  
            MapMessage map = (MapMessage)message;  
            String stock = map.getString("stock");  
            double price = map.getDouble("price");  
            double offer = map.getDouble("offer");  
            boolean up = map.getBoolean("up");  
      
            DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );  
            System.out.println("接收消息:"+stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
}  延伸:
若是将上述程序中的 createTopic全部替换为createQueue,则运行结果如下:
生产者输出不变,这里就不截图了。
下面是消费者消费情况截图:
  










