分布式作业
# 问题重述
![]()
# 实验原理
这段代码涉及到 Java 消息服务(JMS)和消息代理(Apache ActiveMQ),它们是用于构建分布式、可靠和异步消息传递系统的关键技术。
JMS 是 Java 中用于发送和接收消息的 API 规范,它提供了一种标准的方式来创建、发送、接收和管理消息。JMS 有两种消息传递模型:点对点(Point-to-Point)和发布 / 订阅(Publish/Subscribe)。在这个例子中,我们使用的是发布 / 订阅模型。
消息代理 是一个中间件系统,负责将消息从发送者传递给接收者。Apache ActiveMQ 是一个流行的开源消息代理,它实现了 JMS 规范,并提供了可靠的消息传递机制。在这个例子中,ActiveMQ 充当了消息代理的角色,负责管理消息的传递和路由。
# 开发环境
# 软件版本
Windows11
Eclipse IDE for Java Developers - 2022-03
Java 1.8
Active MQ 6.1.0
Xchart 3.8.1
Maven3.6.1
![]()
# pom 文件配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> <java.version>21</java.version> <maven.compiler.source>21</maven.compiler.source> <maven.compiler.target>21</maven.compiler.target> </properties>
<groupId>com.xidian.dc.mq</groupId> <artifactId>mq-topic</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.knowm.xchart</groupId> <artifactId>xchart</artifactId> <version>3.8.1</version> </dependency> <dependency> <groupId>org.jfree</groupId> <artifactId>jfreechart</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>jakarta.jms</groupId> <artifactId>jakarta.jms-api</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>6.1.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.17.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.17.2</version> </dependency> </dependencies>
</project>
|
# 设计思路
# 构架图
![]()
# 各个代码的用途
# Publisher
随机信号产生器微服务,生成正太分布的随机数字并发布在 MYTOPIC 服务上
生成方式 'double randomNumner = random.nextGaussian () * stdDev + mean ; '
通过传输 main 函数的变量来区分不同的产生器
# ASyncConsumer
随机信号统计分析微服务,并将处理后的数据发布在 DATA 服务上
storeMessage: 对传输过来的数据进行处理并存储
publisher: 对存储的数据进行统计和发布
# MyListener
实现 MessageListener 接口,并接收存储数据。实现对 ASyncConsumer 的 consumer 的处理
# ShowConsumer
实时数据显示微服务,并调用 RealTimeChart 实现数据的可视化
paint: 通过调用 RealTimeChart 里面的 plot 来实现实时绘制图像
# MyListener1
实现 MessageListener 接口,并接收存储数据。实现对 ShowConsumer 的 consumer 的处理
# RealTimeChart
实现实时画图
# Commands
mvn clean
mvn compile
mvn exec:java -Dexec.mainClass="ASyncConsumer"
mvn exec:java -Dexec.mainClass="ShowConsumer"
mvn exec:java -Dexec.mainClass="Publisher" -Dexec.args="1"
mvn exec:java -Dexec.mainClass="Publisher" -Dexec.args="2"
mvn exec:java -Dexec.mainClass="Publisher" -Dexec.args="3"
mvn exec:java -Dexec.mainClass="Publisher" -Dexec.args="4"
# JAVA 程序
# MyListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
import jakarta.jms.Message; import jakarta.jms.MessageListener; import jakarta.jms.ObjectMessage; import jakarta.jms.Message; import jakarta.jms.TextMessage;
public class MyListener implements MessageListener { private ASyncConsumer consumer ; public MyListener ( ASyncConsumer consumer ) { this.consumer = consumer ; }
public void onMessage(Message message) { try { String text = ((TextMessage) message).getText() ; System.out.println("Received a message: "+text); consumer.storeMessage(text); consumer.publisher("ACK"); } catch (Exception e) { e.printStackTrace(); } }
}
|
# Publisher.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| import jakarta.jms.Connection; import java.util.Random ; import jakarta.jms.ConnectionFactory; import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.Message; import jakarta.jms.MessageProducer; import jakarta.jms.ObjectMessage; import jakarta.jms.Session; import jakarta.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Publisher {
private static String brokerURL = "tcp://localhost:61616"; private static ConnectionFactory factory; private Connection connection; private Session session; private MessageProducer producer; private Topic topic; public Publisher(String topicName) throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic(topicName); producer = session.createProducer(topic); connection.start(); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Publisher publisher = new Publisher("MYTOPIC"); System.out.println(args[0]) ; while ( true ) { publisher.sendMessage(args[0]); try { Thread.sleep(1000); } catch (InterruptedException e) { publisher.close(); e.printStackTrace(); } } } public void sendMessage(String UID ) throws JMSException { Random random = new Random( ) ; int num = Integer.parseInt(UID) ; double mean = num ; double stdDev = 1 ; double randomNumner = random.nextGaussian() * stdDev + mean ; Message message = session.createTextMessage(UID+"+"+Double.toString(randomNumner)); producer.send(message); System.out.println("Sent a message!"); } public void sendData ( String Data ) throws JMSException { Message message = session.createTextMessage(Data) ; producer.send(message); System.out.println ("Sent a Data!") ; }
}
|
# ASyncConsumer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.MessageConsumer; import jakarta.jms.MessageProducer; import jakarta.jms.Session; import jakarta.jms.Message; import jakarta.jms.TextMessage; import jakarta.jms.Topic; import java.util.ArrayList ; import java.util.List;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ASyncConsumer { private List<String> messages ; private List<Double>[] arrayOfLists = new List[5] ; public ASyncConsumer ( ) { messages = new ArrayList<>() ; for ( int i = 0 ; i < arrayOfLists.length ; i ++ ) { arrayOfLists[i] = new ArrayList<>() ; } } public void storeMessage ( String message ) { messages.add(message) ; String[] parts = message.split("\\+") ; String intPart = parts[0] ; String doublePart = parts[1] ; int uid = Integer.parseInt(intPart) ; double val = Double.parseDouble(doublePart) ; arrayOfLists[uid].add(val) ; } public List<String> getMessages ( ) { return messages ; } public void printMessages ( ) { System.out.println("Messages:") ; for ( String message : messages ) { System.out.print ( "orz" + message ) ; } } public static double calculateMean(List<Double> data) { double sum = 0; for (double value : data) { sum += value; } if ( data.size() == 0 ) return 0 ; return sum / data.size(); }
public static double calculateVariance(List<Double> data, double mean) { double sumSquaredDiff = 0; for (double value : data) { sumSquaredDiff += Math.pow(value - mean, 2); } if ( data.size() == 0 ) return 0 ; return sumSquaredDiff / data.size(); } public void publisher ( String topicname ) throws JMSException { Publisher publisher = new Publisher("DATA"); int n = 5 ; for ( int i = 0 ; i < 5 ; i ++ ) { double mean = calculateMean (arrayOfLists[i].subList(Math.max(arrayOfLists[i].size()-n, 0),arrayOfLists[i].size())) ; double variance = calculateVariance (arrayOfLists[i].subList(Math.max(arrayOfLists[i].size()-n, 0),arrayOfLists[i].size()),mean) ; double Max = arrayOfLists[i].stream().mapToDouble(Double::doubleValue).max().orElse(0) ; double Min = arrayOfLists[i].stream().mapToDouble(Double::doubleValue).min().orElse(0) ; double cur = 0 ; if ( arrayOfLists[i].size() != 0 ) cur = arrayOfLists[i].get(arrayOfLists[i].size()-1) ; else cur = 0 ; String Data = Integer.toString(i)+"+"+Double.toString(mean)+"+"+Double.toString(variance)+"+"+Double.toString(Max)+"+"+Double.toString(Min)+"+"+Double.toString(cur); publisher.sendData(Data); } }
public static void main(String[] args) throws JMSException { ASyncConsumer consumer = new ASyncConsumer ( ) ; String brokerURL = "tcp://localhost:61616"; ConnectionFactory factory = null; Connection connection = null; Session session = null; Topic topic = null; MessageConsumer messageConsumer = null; MyListener listener = null; try { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic("MYTOPIC");
messageConsumer = session.createConsumer(topic); listener = new MyListener(consumer); messageConsumer.setMessageListener(listener); connection.start(); System.out.println("Press any key to exit."); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { connection.close(); } } }
|
# ShowConsumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.MessageConsumer; import jakarta.jms.MessageProducer; import jakarta.jms.Session; import jakarta.jms.Message; import jakarta.jms.TextMessage; import jakarta.jms.Topic; import java.util.ArrayList ; import java.util.List;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ShowConsumer { private List<Double>[][] arrayOfLists = new List[6][5] ; RealTimeChart[] chart = new RealTimeChart[6] ; public ShowConsumer ( ) { for ( int i = 1 ; i < 6 ; i ++ ) { for ( int j = 0 ; j < 5 ; j ++ ) { arrayOfLists[i][j] = new ArrayList<>() ; } } for ( int i = 1 ; i < 6 ; i ++ ) { chart[i] = new RealTimeChart ( "History Data" , "UID"+Integer.toString(i) , 500 ) ; } }
public void paint ( ) { for ( int i = 1 ; i < 6 ; i ++ ) { if ( arrayOfLists[i][4].size() != 0 ) chart[i].plot(arrayOfLists[i][4].get(arrayOfLists[i][4].size()-1)); } } public void storeMessage ( String message ) { String[] parts = message.split("\\+") ; String uidPart = parts[0] ; String meanPart = parts[1] ; String variancePart = parts[2] ; String maxPart = parts[3] ; String minPart = parts[4] ; String curPart = parts[5] ; int uid = Integer.parseInt(uidPart) ; double mean = Double.parseDouble(meanPart) ; double variance = Double.parseDouble(variancePart) ; double Max = Double.parseDouble(maxPart) ; double Min = Double.parseDouble(minPart) ; double cur = Double.parseDouble(curPart) ; arrayOfLists[uid][0].add(mean) ; arrayOfLists[uid][1].add(variance) ; arrayOfLists[uid][2].add(Max) ; arrayOfLists[uid][3].add(Min) ; arrayOfLists[uid][4].add(cur) ; } public static void main(String[] args) throws JMSException { ShowConsumer consumer = new ShowConsumer ( ) ; String brokerURL = "tcp://localhost:61616"; ConnectionFactory factory = null; Connection connection = null; Session session = null; Topic topic = null; MessageConsumer messageConsumer = null; MyListener2 listener = null; try { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic("DATA");
messageConsumer = session.createConsumer(topic); listener = new MyListener2(consumer); messageConsumer.setMessageListener(listener); connection.start(); System.out.println("Press any key to exit."); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { connection.close(); } } }
|
# MyListener2.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import jakarta.jms.Message; import jakarta.jms.MessageListener; import jakarta.jms.ObjectMessage; import jakarta.jms.Message; import jakarta.jms.TextMessage;
public class MyListener2 implements MessageListener { private ShowConsumer consumer ; public MyListener2 ( ShowConsumer consumer ) { this.consumer = consumer ; }
public void onMessage(Message message) { try { String text = ((TextMessage) message).getText() ; System.out.println("Received a message2: "+text); consumer.storeMessage(text); consumer.paint ( ) ; } catch (Exception e) { e.printStackTrace(); } }
}
|
# RealTimeChart.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| import java.util.LinkedList; import java.util.List;
import javax.swing.JFrame;
import org.knowm.xchart.SwingWrapper; import org.knowm.xchart.XYChart; import org.knowm.xchart.XYChartBuilder; import org.knowm.xchart.style.Styler.ChartTheme; import org.knowm.xchart.style.Styler.LegendLayout; import org.knowm.xchart.style.Styler.LegendPosition;
public class RealTimeChart { private SwingWrapper<XYChart> swingWrapper ; private XYChart chart ; private JFrame frame ; private String title ; private String seriesName ; private List<Double> seriesData ; private int size = 1000 ; public int getSize ( ) { return size ; } public String getSeriesName ( ) { return seriesName ; } public void setSeeriesName ( String seriesName ) { this.seriesName = seriesName ; } public String getTitle ( ) { return title ; } public void setTitle ( String title ) { this.title = title ; } public RealTimeChart ( String title , String seriesName ) { super ( ) ; this.seriesName = seriesName ; this.title = title ; } public RealTimeChart ( String title , String seriesName , int size ) { super ( ) ; this.seriesName = seriesName ; this.title = title ; this.size = size ; } public void plot ( double data ) { if ( seriesData == null ) { seriesData = new LinkedList<>() ; } if ( seriesData.size( ) == this.size ) { seriesData.clear ( ) ; } seriesData.add(data) ; if ( swingWrapper == null ) { chart = new XYChartBuilder().width(600).height(450).theme(ChartTheme.Matlab).title(title).build(); chart.addSeries(seriesName, null, seriesData); chart.getStyler().setLegendPosition(LegendPosition.OutsideS); chart.getStyler().setLegendLayout(LegendLayout.Horizontal); swingWrapper = new SwingWrapper<XYChart>(chart); frame = swingWrapper.displayChart(); frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE); } else { chart.updateXYSeries ( seriesName , null , seriesData , null ) ; swingWrapper.repaintChart(); } } }
|
# 实现效果
![]()
可以看到不同的产生器的数据分布在不同的值之间,产生器 1 徘徊在 1 之间,产生器 2 徘徊在 2 之间以此类推
可以看到无论是处理器还是产生器在命令指示符中均正常工作,达到实验预期!
![]()
观察 topic 界面,可以看出在正常工作,符合预期!