0
点赞
收藏
分享

微信扫一扫

javax.jms.*包 使用参考


package com.lenovo.lps.psb.common.log.reduce.avatar.amq;
import java.util.Date;
import java.util.Enumeration;import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;

public class AMQMonitor {

public AMQMonitor(){

}

public static String[] statisticsPbQueue(String amq_nodeName)throws Exception{
String result[]=new String[10];
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
//PBMQ_1_xxx
String brokerurl="";
System.out.print("brokerurl========"+brokerurl);
factory.setBrokerURL(brokerurl.trim());
Connection connection = factory.createConnection();
connection.start();
//使用事务 自动签收
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);


// for(int i=0;i<Integer.parseInt(PbmqNodesConfigResourceBindUtil.getAmqNode_queueNum());i++){
// String QueueNamekey="queueName_"+(i+1);
// String queueName=PbmqNodesConfigResourceBindUtil.getQueueNameStr(QueueNamekey);
// result[i]=statisticsPbQueue(session,queueName);
// System.out.print(result[i]);
// }

//关闭session、connection
try {
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
return result;
}

public static int statisticsQueuePaddingnum(Connection connection,String queueName)throws Exception{
int paddingnum=0;
connection.start();

//使用事务 自动签收
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

paddingnum=statisticsPbQueue(session,queueName);

return paddingnum;
}

public static void sendNewMsg(Session session,int msgNum,String queueName)throws Exception{
//创建queue 如果该队列在activemq服务器上存在 那么就获取到该queue的实例
Destination destination = session.createQueue(queueName);
//destination.
//Destination destination = session.createQueue("MONITOR-test-queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
int i = 0 ;
TextMessage message = null ;
while(i<msgNum){
i++;
message = session.createTextMessage(createMessageText(i));
producer.send(message);
Thread.sleep(80);
}
session.commit();
System.out.println("发送完毕!");

}

public static void statisticsActiveMQLocal(Session session) throws Exception{
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo); String queueName = "ActiveMQ.Statistics.Broker";
Queue testQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(testQueue);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg); MapMessage reply = (MapMessage) consumer.receive();
//assertNotNull(reply); for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.out.println(name + "=" + reply.getObject(name));
}
}
public static int statisticsPbQueue(Session session,String queuename) throws InterruptedException {

String[] result=new String[4];
String result_statis="";
int pendingmsgNum=0;

Queue replyTo;
MessageConsumer consumer=null;
//发送统计探测消息
try {
replyTo = session.createTemporaryQueue();
consumer = session.createConsumer(replyTo);

Queue testQueue = session.createQueue(queuename);
//testQueue.
MessageProducer producer = session.createProducer(null);
String queueName = "ActiveMQ.Statistics.Destination." + testQueue.getQueueName();
Queue query = session.createQueue(queueName);

//先发送test message至目标队列 自然过期 然后发送test message至统计目标后门队列 等待响应数据
Message msg = session.createMessage();
msg.setJMSExpiration(1);
producer.send(testQueue, msg);
Thread.sleep(1);
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
} catch (JMSException e1) {
e1.printStackTrace();
}

//接收统计探测结果
try{
MapMessage reply = (MapMessage) consumer.receive();
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
if("consumerCount".equalsIgnoreCase(name)){
result[1]=String.valueOf(reply.getObject(name));
}else if("enqueueCount".equalsIgnoreCase(name)){
result[2]=String.valueOf(reply.getObject(name));
}else if("dequeueCount".equalsIgnoreCase(name)){
result[3]=String.valueOf(reply.getObject(name));
}
//System.err.println(name + "=" + reply.getObject(name));
}
pendingmsgNum=Integer.parseInt(result[2])-Integer.parseInt(result[3]);

result[0]=String.valueOf(pendingmsgNum);
String queuename_STR=queuename;
result_statis=" "+queuename_STR+"\t"+result[0]+"\t\t"+result[1]+"\t\t"+result[2]+"\t"+result[3];

}catch(Exception e){
e.printStackTrace();
}
return pendingmsgNum;
}

private static String createMessageText(int index) {
int messageSize = 100 ;
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}

for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}

return buffer.toString();
}

/**
* @param args
*/
public static void main(String[] args) {

try {
AMQMonitor.statisticsPbQueue("PBMQ_1");
} catch (Exception e) {
e.printStackTrace();
}
}
}

举报

相关推荐

0 条评论