题目要求重述
开发环境+系统环境
Windows 10 + eclipse(2022-03) + Java 1.8 + ActiveMQ 5.16.4(with Java 8+ support)+Xchart(visualization)
ActiveMQ下载地址:ActiveMQ
(国内下载可能比较慢,注意要下载和当前Java版本支持的对应ActiveMQ版本)
ActiveMQ安装:ActiveMQ的安装与使用_qq_29651203的博客-CSDN博客_activemq安装与配置
XChart教程及示例:XChart Example Code – Knowm.org
XChart实时折线图Demo:【java】XChart实时折线图_CallMeJacky的博客-CSDN博客_xchart
XChart在Maven中pom.xml配置如下:
<dependency>
<groupId>org.knowm.xchart</groupId>
<artifactId>xchart</artifactId>
<version>3.8.1</version>
</dependency>
具体设计
Java工程文件结构
架构图
其中:绿色背景表示该类的主要职责类似Publisher,粉色背景表示该类的主要职责类似Subscriber,绿粉相间的则既有Publisher也有Subscriber的功能。蓝色的则为Listener,负责侦听信息。白底的为Topic(主题)。
各.java文件说明
随机信号产生器
RandomSignalProducer.java
负责生成随机信号并通过sendMessage()函数传递随机数给SIGNAL主题。
随机信号统计分析
SignalAnalysisListener.java
负责侦听SIGNAL主题的信息并进行收集处理,以及生成实时数据显示的统计信息,通过MOM发布给VIEW主题。
SignalAnalysisConsumer.java
负责订阅SIGNAL主题,并通过对应的Listener进行侦听,采取的通信方式为异步通信。
实时数据显示
SignalListener.java
负责实时侦听SIGNAL主题的信息并进行可视化处理并输出折线图。(使用XChart)
ResultListener.java
负责侦听VIEW主题,输出侦听到的经过序列化的字符串信息。
ResultViewConsumer.java
负责订阅VIEW和SIGNAL主题,并通过对应的Listener进行侦听,采取的通信方式为异步通信。
实机演示结果
初始化(运行各xxConsumer.java)
ActiveMQ初始化结果(Topic部分,暂未撤销之前处理过的数据)
运行RandomSignalProducer.java:
注,为防止死循环,每次调用RandomSignalProducer生成100个随机数,而非一直生成,可以不断调用该类来实现持续生成。
ActiveMQ此时的信息(Topic部分):
可以看到,SIGNAL的Enqueued数增加了100,Dequeued数增加了200,再次验证了Topic法通信时将单份信息重复分发给不同Consumer各一份的特点。
源代码部分
RandomSignalProducer
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class RandomSignalProducer {
private static String brokerURL = "tcp://localhost:61616";
private static ConnectionFactory factory;
private Connection connection;
private Session session;
private MessageProducer producer;
private Topic topic;
public RandomSignalProducer(String topicName) throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(topicName);
producer = session.createProducer(topic);
connection.start();
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
RandomSignalProducer publisher = new RandomSignalProducer("SIGNAL");
publisher.sendMessage();
publisher.close();
}
public void sendMessage() throws JMSException {
try {
int maxSize = 100;
double std_dev = 2.0;
double mean = 20;
Message msg;
for(int i = 0 ; i < maxSize; i++) {
Random r = new Random();
double rd = r.nextGaussian() * std_dev + mean;
msg = session.createObjectMessage(rd);
producer.send(msg);
System.out.println("Random double generated: " + rd);
//sleep duration is 100 ms
Thread.sleep(100);
}
} catch(InterruptedException ie) {
ie.printStackTrace();
}
}
}
ResultListener
//import java.util.ArrayList;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* This class is served as the display of real-time signal's statistics.
* @author Fa1con_JS, Xidian University*/
public class ResultListener implements MessageListener{
public void onMessage(Message message) {
try {
//ArrayList<Double> received = (ArrayList<Double>) ((ObjectMessage)message).getObject();
//System.out.println("Received a double message: "+ received.get(0) + " " + received.get(1));
//get statistics of the random signal sequence
System.out.println("Received a signal statistic message: ");
System.out.println(((TextMessage)message).getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
ResultViewConsumer
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ResultViewConsumer {
public static void main(String[] args) throws JMSException {
String brokerURL = "tcp://localhost:61616";
ConnectionFactory factory = null;
Connection connection = null;
Session session = null;
Topic resultTopic = null;
Topic drawTopic = null;
MessageConsumer resultConsumer = null;
MessageConsumer drawConsumer = null;
ResultListener resultListener = null;
SignalListener drawListener = null;
try {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
resultTopic = session.createTopic("VIEW");
drawTopic = session.createTopic("SIGNAL");
resultConsumer = session.createConsumer(resultTopic);
drawConsumer = session.createConsumer(drawTopic);
resultListener = new ResultListener();
drawListener = new SignalListener();
resultConsumer.setMessageListener(resultListener);
drawConsumer.setMessageListener(drawListener);
connection.start();
System.out.println("Press any key to exit.");
System.in.read(); // Pause
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
}
SignalAnalysisConsumer
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
//import javax.jms.Message;
//import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SignalAnalysisConsumer {
public static void main(String[] args) throws JMSException {
String brokerURL = "tcp://localhost:61616";
ConnectionFactory factory = null;
Connection connection = null;
Session session = null;
Topic topic = null;
//MessageConsumer messageConsumer = null;
MessageConsumer signalAnalyser = null;
//MessageConsumer signalVisualizer = null;
//SignalListener listener = null;
SignalAnalysisListener sa = null;
//SignalVisualizaion sv = null;
try {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("SIGNAL");
//messageConsumer = session.createConsumer(topic);
signalAnalyser = session.createConsumer(topic);
//signalVisualizer = session.createConsumer(topic);
//listener = new SignalListener();
// "VIEW" is the topic-to-sent, since SignalAnalysisListener is a hybrid of Consumer and Producer.
sa = new SignalAnalysisListener("VIEW");
//sv = new SignalVisualizaion();
//messageConsumer.setMessageListener(listener);
signalAnalyser.setMessageListener(sa);
//signalVisualizer.setMessageListener(sv);
connection.start();
System.out.println("Press any key to exit.");
System.in.read(); // Pause
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
}
SignalAnalysisListener
import java.util.ArrayList;
//import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* This class serves as the signal analysis listener, which helps the Consumer who subscribes to the "SIGNAL"
* topic to deal with the received data, and then serves as a Producer(Publisher) of the "VIEW" topic to produce
* statistics of current signal sequences(e.g. MAX, MIN value)
* @author Fa1con_JS, Xidian University*/
public class SignalAnalysisListener implements MessageListener {
private double N;
private double max, min;
private ArrayList<Double> currList;
private static String brokerURL = "tcp://localhost:61616";
private static ConnectionFactory factory;
private Connection connection;
private Session session;
private MessageProducer producer;
private Topic topic;
/**
* @param topicName the producer's topic
* @author Fa1con_JS*/
public SignalAnalysisListener(String topicName) throws JMSException { // N's default value is set to be 10.
// TODO Auto-generated constructor stub
this.currList = new ArrayList<Double>();
this.max = Double.NEGATIVE_INFINITY;
this.min = Double.POSITIVE_INFINITY;
this.N = 10;
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(topicName);
producer = session.createProducer(topic);
connection.start();
}
/**
* @param topicName the producer's topic
* @param n size of the N(a constant)
* @author Fa1con_JS*/
public SignalAnalysisListener(String topicName, int n) throws JMSException { //with N self-designed of n.
this.currList = new ArrayList<Double>();
this.max = Double.NEGATIVE_INFINITY;
this.min = Double.POSITIVE_INFINITY;
this.N = n;
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(topicName);
producer = session.createProducer(topic);
connection.start();
}
public void onMessage(Message message) {
try {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(topic);
connection.start();
Double rdReceived = (Double) ((ObjectMessage)message).getObject();
//ArrayList<String, Double> tmp = new ArrayList<String, Double>();
//ArrayList<Double> tmp = new ArrayList<Double>();
String tmp = ""; // tmp saves the statistics of the current sequence, including: max, min, mean, variance
double rd = rdReceived.doubleValue();
max = max > rd ? max : rd; //update max value
min = min < rd ? min : rd; //update min value
//tmp.put("max", max);
tmp += "max: ";
tmp += max + ", ";
//tmp.put("min", min);
//tmp.add(max);
tmp += "min: ";
tmp += min + ", ";
//show max and min values
System.out.println();
System.out.println("++++++++++++++++++++++++++++++++++++++++++");
System.out.println("current MAX is: " + max + ".");
System.out.println("current MIN is: " + min + ".");
System.out.println("++++++++++++++++++++++++++++++++++++++++++");
System.out.println();
//if list's size is bigger than N, remove the beginning element and show new mean and variance value
if(currList.size() < N) {
currList.add(rd);
} else {
currList.remove(0);
currList.add(rd);
double avg = getAvg();
double var = getVar(avg);
System.out.println();
System.out.println("==---------------------------------------------------==");
System.out.println("Mean value of the last N signals is: " + avg);
System.out.println("==-----------------------------------------------------==");
System.out.println("Variance value of the last N signals is:" + var);
System.out.println("==-----------------------------------------------------==");
System.out.println();
//tmp.put("mean", avg);
tmp += "avg: ";
tmp += avg + ", ";
//tmp.add(avg);
//tmp.put("var", var);
tmp += "var: ";
tmp += var + ".";
//tmp.add(var);
}
//producer sends the message to the "VIEW" topic, supporting ResultViewConsumer to get the current statistics
producer.send(session.createTextMessage(tmp));
producer.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* calculate the mean value of last N doubles*/
public double getAvg() { // calculate the mean value of last N doubles
double avg = 0.0;
for(int i = 0; i < this.currList.size(); i++) {
avg += this.currList.get(i);
}
return avg / this.currList.size();
}
/**
* calculate the variance value of last N doubles*/
public double getVar() { // calculate the variance value of last N doubles
double var = 0.0;
double avg = this.getAvg();
for(int i = 0; i < this.currList.size(); i++) {
var += Math.pow(this.currList.get(i) - avg, 2);
}
return var / this.currList.size();
}
/**
* calculate the variance value of last N doubles, with mean value pre-calculated
* @param avg pre-calculated mean value of the N-sized-List*/
public double getVar(double avg) { // calculate the variance value of last N doubles, with mean value pre-calculated
double var = 0.0;
for(int i = 0; i < this.currList.size(); i++) {
var += Math.pow(this.currList.get(i) - avg, 2);
}
return var / this.currList.size();
}
}
SignalListener
import java.util.ArrayList;
import java.util.List;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
//import javax.jms.Message;
//import javax.jms.TextMessage;
import org.knowm.xchart.SwingWrapper;
import org.knowm.xchart.XYChart;
import org.knowm.xchart.XYChartBuilder;
import org.knowm.xchart.style.Styler.ChartTheme;
import org.knowm.xchart.style.Styler.LegendLayout;
import org.knowm.xchart.style.Styler.LegendPosition;
/**
* This class serves as the visualization of the current random signal, uses XChart as the visualization tool
* @author Fa1con_JS, Xidian University*/
public class SignalListener implements MessageListener {
private XYChart chart;
private SwingWrapper<XYChart> sw;
private List<Double> data = null;
private int num = 50; //chart's maximum display range.(x-axis length)
/**
* the chart's maximum display range is default set to 50.*/
public SignalListener() {
// TODO Auto-generated constructor stub
this.data = new ArrayList<Double>();
this.data.add(0.0); //when this listener is built, the default display value will be set to (0.0, 0.0)
//chart's settings(including legend and size, etc.)
this.chart = new XYChartBuilder().width(600).height(450).theme(ChartTheme.Matlab).title("real-time signal visualization").build();
this.chart.addSeries("amplitude(random's number)", null, data);
this.chart.getStyler().setLegendPosition(LegendPosition.OutsideS);
this.chart.getStyler().setLegendLayout(LegendLayout.Horizontal);
//display the chart
this.sw = new SwingWrapper<XYChart>(chart);
this.sw.displayChart();
}
public void onMessage(Message message) {
try {
//deserialization of the message received. Convert it to the src format (double)
Double rdReceived = (Double) ((ObjectMessage)message).getObject();
double rd = rdReceived.doubleValue();
data.add(rdReceived);
//update the Y-Data of the chart
if(data.size() > num) {
data.remove(0);
}
this.chart.updateXYSeries("amplitude(random's number)", null, data, null);
//update the chart
this.sw.repaintChart();
//show the received number
System.out.println("Received a double message: "+ rd);
} catch (Exception e) {
e.printStackTrace();
}
}
}