1 安装Erlang(spring整合及死信队列将在下一篇讲解)
 
 
# wget http://erlang.org/download/otp_src_19.3.tar.gz
  
 
  
   # tar zxvf otp_src_19.3.tar.gz
  
 
  
   # cd otp_src_19.3
  
 
  
   # ./configure --prefix=/opt/erlang   
  
 
  
   //如果上一步报错,则执行yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
  
 
  
   # make && make install
  
 
  
# vim /etc/profile
 
  
   ERLANG HOME=/opt/erlang
  
 
  
   export PATH=$PATH:$ERLANG HOME/bin
  
 
  
   export ERLANG_HOME
  
 
  
   # source /etc/profile
  
 
  
   # erl    //检查是否安装成功
  
 
  
   # wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-generic-unix-3.6.12.tar.xz
  
 
  
   # xz -d rabbitmq-server-generic-unix-3.6.12.tar.xz   //转换压缩文件格式
  
 
  
   # cd /opt
  
 
  
   # mv rabbitmq_server-3.6.12 rabbitmq
  
 
  
   # vim /etc/profile
  
 
  
   export PATH=$PATH:/opt/rabbitmq/sbin
  
 
  
   export RABBITMQ_HOME=/opt/rabbitmq
  
 
  
   # source /etc/profile
  
 
  
   # rabbitmq-server -detached   //启动rabbitmq服务
  
 
  
   # rabbitmqctl status        //查看rabbitmq状态,也可用rabbitmqctl cluster status查看集群信息
 
 
 
2 使用java程序连接测试其生产者消费者
 
 
rabbitmq模型图如下
 
 
 
 

 
 
引入依赖
 
 
<dependency>
  
 
  
       <groupId>com.rabbitmq</groupId>
  
 
  
       <artifactId>amqp-client</artifactId>
  
 
  
       <version>4.2.1</version>
  
 
  
   </dependency>
  
 
 
   
 
 
 
  rabbitmq默认在本地通过localhost访问时账号密码是guest/guest,但是远程的话就需要设置如下:
 
 
 
   # rabbitmqctl add_user root root   //新建root用户
  
 
  
   # rabbitmqctl set_permissions -p / root ".*" ".*" ".*"   //给root授予所有权限
  
 
  
   # rabbitmqctl set_user_tags root administrator        //设置root为管理员
 
编写消费者客户端代码如下:
 
 
public class RabbitProducer {
  
 
  
       private static final String EXCHANGE_NAME = "exchange_demo";
  
 
  
       private static final String ROUTING_KEY = "routingkey_demo";
  
 
  
       private static final String QUEUE_NAME = "queue_demo";
  
 
  
       private static final String IP_ADDRESS = "xx.xxx.xxx.xx";
  
 
  
       private static final int PORT = 5672;//RabbitMQ服务端默认端口为5672
  
 
  
    
  
 
  
       public static void main(String[] args) throws IOException, TimeoutException {
  
 
  
           ConnectionFactory factory = new ConnectionFactory();
  
 
  
           factory.setHost(IP_ADDRESS);
  
 
  
           factory.setPort(PORT);
  
 
  
           factory.setUsername("root");
  
 
  
           factory.setPassword("root");
  
 
  
           Connection connection = factory.newConnection(); //创建链接
  
 
  
           Channel channel = connection.createChannel();   //创建信道
  
 
  
           // 创建一个type="direct"、持久的、非自动删除的交换器
  
 
  
           channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
  
 
  
           // 创建一个持久的、排他的、非自动删除的队列
  
 
  
           channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  
 
  
           // 将交换器与队列通过路由健绑定
  
 
  
           channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
  
 
  
           // 发送一条持久化的消息:hello world
  
 
  
           String message = "hello world!";
  
 
  
           channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
  
 
  
                   MessageProperties.PERSISTENT_TEXT_PLAIN,
  
 
  
                   message.getBytes());
  
 
  
           // 关闭资源
  
 
  
           channel.close();
  
 
  
           connection.close();
  
 
  
       }
  
 
  
   }
 
 
编写消费者客户端代码如下:
 
 
public class RabbitConsumer {
  
 
  
       private static final String QUEUE_NAME = "queue_demo";
  
 
  
       private static final String IP_ADDRESS = "134.175.124.102";
  
 
  
       private static final int PORT = 5672;//RabbitMQ服务端默认端口为5672
  
 
  
    
  
 
  
       public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  
 
  
           Address[] addresses = new Address[] {
  
 
  
               new Address(IP_ADDRESS, PORT)
  
 
  
           };
  
 
  
           ConnectionFactory factory = new ConnectionFactory();
  
 
  
           factory.setUsername("root");
  
 
  
           factory.setPassword("root");
  
 
  
           // 这里的连接方式与生产者的demo略有不同,注意辨别区别
  
 
  
           Connection connection = factory.newConnection(addresses); //创建连接
  
 
  
           final Channel channel = connection.createChannel();  //创建信道
  
 
  
           channel.basicQos(64);  //设置客户端最多接收未被ack的消息的个数
  
 
  
           Consumer consumer = new DefaultConsumer(channel) {
  
 
  
               @Override
  
 
  
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  
 
  
                   super.handleDelivery(consumerTag, envelope, properties, body);
  
 
  
                   System.out.println("recv message: " + new String(body));
  
 
  
                   try {
  
 
  
                       TimeUnit.SECONDS.sleep(1);
  
 
  
                   } catch (InterruptedException e) {
  
 
  
                       e.printStackTrace();
  
 
  
                   }
  
 
  
                   channel.basicAck(envelope.getDeliveryTag(), false);
  
 
  
               }
  
 
  
           };
  
 
  
           channel.basicConsume(QUEUE_NAME, consumer);
  
 
  
           // 等待回调函数执行完毕之后,关闭资源
  
 
  
           TimeUnit.SECONDS.sleep(5);
  
 
  
           channel.close();
  
 
  
           connection.close();
  
 
  
       }
  
 
  
   }










