0
点赞
收藏
分享

微信扫一扫

西电计科分布式第四次作业设计说明

海滨公园 2022-04-30 阅读 76

题目要求重述

开发环境+系统环境

Windows 10 + eclipse(2022-03) + Java 1.8 + ActiveMQ 5.16.4(with Java 8+ support)+Xchart(visualization)

ActiveMQ下载地址:ActiveMQ

(国内下载可能比较慢,注意要下载和当前Java版本支持的对应ActiveMQ版本)

ActiveMQ安装:ActiveMQ的安装与使用_qq_29651203的博客-CSDN博客_activemq安装与配置

XChart教程及示例:XChart Example Code – Knowm.org

XChart实时折线图Demo:【java】XChart实时折线图_CallMeJacky的博客-CSDN博客_xchart

XChart在Maven中pom.xml配置如下:

<dependency>
	<groupId>org.knowm.xchart</groupId>
	<artifactId>xchart</artifactId>
	<version>3.8.1</version>
</dependency>

具体设计

Java工程文件结构 

架构图 

        其中:绿色背景表示该类的主要职责类似Publisher,粉色背景表示该类的主要职责类似Subscriber,绿粉相间的则既有Publisher也有Subscriber的功能。蓝色的则为Listener,负责侦听信息。白底的为Topic(主题)。

各.java文件说明

随机信号产生器

        RandomSignalProducer.java

        负责生成随机信号并通过sendMessage()函数传递随机数给SIGNAL主题。

随机信号统计分析

        SignalAnalysisListener.java

        负责侦听SIGNAL主题的信息并进行收集处理,以及生成实时数据显示的统计信息,通过MOM发布给VIEW主题。

        SignalAnalysisConsumer.java

        负责订阅SIGNAL主题,并通过对应的Listener进行侦听,采取的通信方式为异步通信。

实时数据显示

        SignalListener.java

        负责实时侦听SIGNAL主题的信息并进行可视化处理并输出折线图。(使用XChart)

        ResultListener.java

        负责侦听VIEW主题,输出侦听到的经过序列化的字符串信息。

        ResultViewConsumer.java

        负责订阅VIEW和SIGNAL主题,并通过对应的Listener进行侦听,采取的通信方式为异步通信。

实机演示结果

初始化(运行各xxConsumer.java)

 ActiveMQ初始化结果(Topic部分,暂未撤销之前处理过的数据)

 运行RandomSignalProducer.java:

注,为防止死循环,每次调用RandomSignalProducer生成100个随机数,而非一直生成,可以不断调用该类来实现持续生成。

ActiveMQ此时的信息(Topic部分):

 可以看到,SIGNAL的Enqueued数增加了100,Dequeued数增加了200,再次验证了Topic法通信时将单份信息重复分发给不同Consumer各一份的特点。

源代码部分

RandomSignalProducer 

import java.util.Random;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;


import org.apache.activemq.ActiveMQConnectionFactory;

public class RandomSignalProducer {

    private static String brokerURL = "tcp://localhost:61616";
    private static ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;
	private Topic topic;
    
    public RandomSignalProducer(String topicName) throws JMSException {
		
    	factory = new ActiveMQConnectionFactory(brokerURL);
    	connection = factory.createConnection();
        
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		topic = session.createTopic(topicName);
        producer = session.createProducer(topic);
		
		connection.start();
    }    
    
    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }    
    
	public static void main(String[] args) throws JMSException {
    	RandomSignalProducer publisher = new RandomSignalProducer("SIGNAL");
        publisher.sendMessage();
        publisher.close();

	}
	
    public void sendMessage() throws JMSException {
        
		try {
			int maxSize = 100;
			double std_dev = 2.0;
			double mean = 20;
			Message msg;
			for(int i = 0 ; i < maxSize; i++) {
				Random r = new Random();
				double rd = r.nextGaussian() * std_dev + mean;
				msg = session.createObjectMessage(rd);
				producer.send(msg);
				System.out.println("Random double generated: " + rd);
				//sleep duration is 100 ms
				Thread.sleep(100);
			}
		} catch(InterruptedException ie) {
			ie.printStackTrace();
		}
    }	

}

 ResultListener

//import java.util.ArrayList;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * This class is served as the display of real-time signal's statistics.
 * @author Fa1con_JS, Xidian University*/
public class ResultListener implements MessageListener{
	public void onMessage(Message message) {
		try {
			//ArrayList<Double> received = (ArrayList<Double>) ((ObjectMessage)message).getObject();
			//System.out.println("Received a double message: "+ received.get(0) + " " + received.get(1));
			//get statistics of the random signal sequence
			System.out.println("Received a signal statistic message: ");
			System.out.println(((TextMessage)message).getText());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 ResultViewConsumer

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ResultViewConsumer {
	public static void main(String[] args) throws JMSException {
		String brokerURL = "tcp://localhost:61616";
		ConnectionFactory factory = null;
		Connection connection = null;
		
		Session session = null;
		
		Topic resultTopic = null;
		Topic drawTopic = null;
		
		MessageConsumer resultConsumer = null;
		MessageConsumer drawConsumer = null;
		
		ResultListener resultListener = null;
		SignalListener drawListener = null;
		
        
		try {
			factory = new ActiveMQConnectionFactory(brokerURL);
			connection = factory.createConnection();
						
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			
			resultTopic = session.createTopic("VIEW");
			drawTopic = session.createTopic("SIGNAL");

			resultConsumer = session.createConsumer(resultTopic);
			drawConsumer = session.createConsumer(drawTopic);
			
			
			resultListener = new ResultListener();
			drawListener = new SignalListener();
			
			resultConsumer.setMessageListener(resultListener);
			drawConsumer.setMessageListener(drawListener);
			
			connection.start();
			
			System.out.println("Press any key to exit.");
			System.in.read();   // Pause
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			connection.close();
		}
	}
}

 SignalAnalysisConsumer

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
//import javax.jms.Message;
//import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SignalAnalysisConsumer {

    public static void main(String[] args) throws JMSException {
		String brokerURL = "tcp://localhost:61616";
		ConnectionFactory factory = null;
		Connection connection = null;
		Session session = null;
		Topic topic = null;
		
		//MessageConsumer messageConsumer = null;
		MessageConsumer signalAnalyser = null;
		//MessageConsumer signalVisualizer = null;
		
		//SignalListener listener = null;
		SignalAnalysisListener sa = null;
		//SignalVisualizaion sv = null;
		
        
		try {
			factory = new ActiveMQConnectionFactory(brokerURL);
			connection = factory.createConnection();
						
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			topic = session.createTopic("SIGNAL");

			//messageConsumer = session.createConsumer(topic);
			signalAnalyser = session.createConsumer(topic);
			//signalVisualizer = session.createConsumer(topic);
			
			//listener = new SignalListener();
			// "VIEW" is the topic-to-sent, since SignalAnalysisListener is a hybrid of Consumer and Producer.
			sa = new SignalAnalysisListener("VIEW"); 
			//sv = new SignalVisualizaion();
			
			//messageConsumer.setMessageListener(listener);
			signalAnalyser.setMessageListener(sa);
			//signalVisualizer.setMessageListener(sv);
			
			connection.start();
			
			System.out.println("Press any key to exit.");
			System.in.read();   // Pause
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			connection.close();
		}
	}
	
}

 SignalAnalysisListener

import java.util.ArrayList;
//import java.util.HashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * This class serves as the signal analysis listener, which helps the Consumer who subscribes to the "SIGNAL"
 * topic to deal with the received data, and then serves as a Producer(Publisher) of the "VIEW" topic to produce
 * statistics of current signal sequences(e.g. MAX, MIN value)
 * @author Fa1con_JS, Xidian University*/
public class SignalAnalysisListener implements MessageListener {
	private double N;
	private double max, min;
	private ArrayList<Double> currList;
	private static String brokerURL = "tcp://localhost:61616";
    private static ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;
	private Topic topic;
	/**
	 * @param topicName the producer's topic
	 * @author Fa1con_JS*/
	public SignalAnalysisListener(String topicName) throws JMSException { // N's default value is set to be 10.
		// TODO Auto-generated constructor stub
		this.currList = new ArrayList<Double>();
		this.max = Double.NEGATIVE_INFINITY;
		this.min = Double.POSITIVE_INFINITY;
		this.N = 10;
		factory = new ActiveMQConnectionFactory(brokerURL);
    	connection = factory.createConnection();
        
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		topic = session.createTopic(topicName);
        producer = session.createProducer(topic);
		
        connection.start();
	}
	/**
	 * @param topicName the producer's topic
	 * @param n size of the N(a constant)
	 * @author Fa1con_JS*/
	public SignalAnalysisListener(String topicName, int n) throws JMSException { //with N self-designed of n.
		this.currList = new ArrayList<Double>();
		this.max = Double.NEGATIVE_INFINITY;
		this.min = Double.POSITIVE_INFINITY;
		this.N = n;
		factory = new ActiveMQConnectionFactory(brokerURL);
    	connection = factory.createConnection();
        
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		topic = session.createTopic(topicName);
        producer = session.createProducer(topic);
		
        connection.start();
	}
	
	public void onMessage(Message message) {
		try {
			factory = new ActiveMQConnectionFactory(brokerURL);
	    	connection = factory.createConnection();
	        
	        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	        producer = session.createProducer(topic);
			connection.start();
			
			Double rdReceived = (Double) ((ObjectMessage)message).getObject();
			//ArrayList<String, Double> tmp = new ArrayList<String, Double>();
			//ArrayList<Double> tmp = new ArrayList<Double>();
			String tmp = ""; // tmp saves the statistics of the current sequence, including: max, min, mean, variance
			double rd = rdReceived.doubleValue();
			max = max > rd ? max : rd; //update max value
			min = min < rd ? min : rd; //update min value
			//tmp.put("max", max);
			tmp += "max: ";
			tmp += max + ", ";
			//tmp.put("min", min);
			//tmp.add(max);
			tmp += "min: ";
			tmp += min + ", ";
			
			//show max and min values
			System.out.println();
			System.out.println("++++++++++++++++++++++++++++++++++++++++++");
			System.out.println("current MAX is: " + max + ".");
			System.out.println("current MIN is: " + min + ".");
			System.out.println("++++++++++++++++++++++++++++++++++++++++++");
			System.out.println();
			
			//if list's size is bigger than N, remove the beginning element and show new mean and variance value
			if(currList.size() < N) {
				currList.add(rd);
			} else {
				
				currList.remove(0);
				currList.add(rd);
				
				double avg = getAvg();
				double var = getVar(avg);
				
				System.out.println();
				System.out.println("==---------------------------------------------------==");
				System.out.println("Mean value of the last N signals is: " + avg);
				System.out.println("==-----------------------------------------------------==");
				System.out.println("Variance value of the last N signals is:" + var);
				System.out.println("==-----------------------------------------------------==");
				System.out.println();
				
				//tmp.put("mean", avg);
				tmp += "avg: ";
				tmp += avg + ", ";
				//tmp.add(avg);
				//tmp.put("var", var);
				tmp += "var: ";
				tmp += var + ".";
				//tmp.add(var);
			}
			
			//producer sends the message to the "VIEW" topic, supporting ResultViewConsumer to get the current statistics
			producer.send(session.createTextMessage(tmp));
			producer.close();
			connection.close();
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	/**
	 * calculate the mean value of last N doubles*/
	public double getAvg() { // calculate the mean value of last N doubles
		double avg = 0.0;
		for(int i = 0; i < this.currList.size(); i++) {
			avg += this.currList.get(i);
		}
		return avg / this.currList.size();
	}
	/**
	 * calculate the variance value of last N doubles*/
	public double getVar() { // calculate the variance value of last N doubles
		double var = 0.0;
		double avg = this.getAvg();
		for(int i = 0; i < this.currList.size(); i++) {
			var += Math.pow(this.currList.get(i) - avg, 2);
		}
		return var / this.currList.size();
	}
	/**
	 * calculate the variance value of last N doubles, with mean value pre-calculated
	 * @param avg pre-calculated mean value of the N-sized-List*/
	public double getVar(double avg) { // calculate the variance value of last N doubles, with mean value pre-calculated
		double var = 0.0;
		for(int i = 0; i < this.currList.size(); i++) {
			var += Math.pow(this.currList.get(i) - avg, 2);
		}
		return var / this.currList.size();
	}
}

 SignalListener

import java.util.ArrayList;
import java.util.List;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
//import javax.jms.Message;
//import javax.jms.TextMessage;

import org.knowm.xchart.SwingWrapper;
import org.knowm.xchart.XYChart;
import org.knowm.xchart.XYChartBuilder;
import org.knowm.xchart.style.Styler.ChartTheme;
import org.knowm.xchart.style.Styler.LegendLayout;
import org.knowm.xchart.style.Styler.LegendPosition;
/**
 * This class serves as the visualization of the current random signal, uses XChart as the visualization tool
 * @author Fa1con_JS, Xidian University*/
public class SignalListener implements MessageListener {
	private XYChart chart;
	private SwingWrapper<XYChart> sw;
	private List<Double> data = null;
	private int num = 50; //chart's maximum display range.(x-axis length)
	/**
	 * the chart's maximum display range is default set to 50.*/
	public SignalListener() {
		// TODO Auto-generated constructor stub
		this.data = new ArrayList<Double>();
		this.data.add(0.0); //when this listener is built, the default display value will be set to (0.0, 0.0)
		
		//chart's settings(including legend and size, etc.)
		this.chart = new XYChartBuilder().width(600).height(450).theme(ChartTheme.Matlab).title("real-time signal visualization").build();
		this.chart.addSeries("amplitude(random's number)", null, data);
		this.chart.getStyler().setLegendPosition(LegendPosition.OutsideS);
		this.chart.getStyler().setLegendLayout(LegendLayout.Horizontal);
		
		//display the chart
		this.sw = new SwingWrapper<XYChart>(chart);
		this.sw.displayChart();
	}
	public void onMessage(Message message) {
		try {
			//deserialization of the message received. Convert it to the src format (double)
			Double rdReceived = (Double) ((ObjectMessage)message).getObject();
			double rd = rdReceived.doubleValue();
			data.add(rdReceived);
			//update the Y-Data of the chart
			if(data.size() > num) {
				data.remove(0);
			}
			this.chart.updateXYSeries("amplitude(random's number)", null, data, null);
			//update the chart
			this.sw.repaintChart();
			//show the received number
			System.out.println("Received a double message: "+ rd);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}
举报

相关推荐

第四次作业

Linux第四次作业

第四次作业(java)

python第四次作业

第四次作业20220327

java第四次作业

第四次Java作业

MySQL第四次作业

0 条评论