代码 / 分布式

P2P群聊系统

35 分钟阅读
代码分布式代码分布式

分布式作业

问题重述

实验原理

全序广播协议

采用一个服务端,多个客户端的方法,当客户端收到服务端传来的信息时,判断该信息属于ACK信息、新加入客户端信息还是message消息。

如果这条信息是属于ACK信息,将对应消息m的ACK+1,观察队列头部的消息,如果关于该消息已经收到了所有节点的ACK消息,则将其从队列中取出,并完成投递。

如果这条信息属于发送的信息,将消息按逻辑时间戳先后顺序放入缓存队列,观察缓存队列头部的消息,如果该消息尚未广播过ACK消息,则向所有节点发送关于该消息的ACK信息.

具体实现如下图代码所示

因果关系

当客户端收到一条消息就将自己的时间戳与该信息的时间戳相比较,更新为更大的值实现因果关系的同一。

具体实现如下

传递消息

采用socket传递消息

雪花算法

为Server提供信息的ID号

实验环境

软件版本

Windows11
Eclipse IDE for Java Developers - 2022-03
Java 1.8
Maven3.6.1
Gson2.8.5

pom环境

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.ls</groupId>
  <artifactId>p2psystem</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <dependencies>
  
  	<dependency>
    	<groupId>com.google.code.gson</groupId>
    	<artifactId>gson</artifactId>
    	<version>2.8.5</version>
  	</dependency>
  	
  	<dependency>
    	<groupId>org.glassfish.tyrus.bundles</groupId>
    	<artifactId>tyrus-standalone-client</artifactId>
    	<version>1.18</version>
	</dependency>
  	
  	
  </dependencies>
  
  
</project>

设计思路

构架图

各个代码的用途

Client

接受服务端的信息,处理终端和前端UI的信息,将最终信息传递给TextSaverApp进行展示。

Server

不断接收来自PORT端口的信息,并将信息传给所有的Client节点,值得主义的是由于messageID是唯一的,所以需要进行锁操作,我采用了Java中的synchronized函数对此变量进行锁操作。

TextSaverAPP

对文本框接收到的信息进行处理并传递给Server服务器;对从Client收到的信息进行展示。

MessageUtils\Message

对信息进行定义并执行序列化和反序列化操作

MessageQueue

对信息进行存储、删除、记录次数操作,并利用优先队列对信息进行排序。

SnowflakeIdGenerator

生成信息的ID号

Commands

mvn clean
mvn compile

mvn exec:java -Dexec.mainClass=“p2psystem.Server”

mvn exec:java -Dexec.mainClass=“p2psystem.Client”

JAVA程序

Client.java


package p2psystem;

import java.io.* ;
import java.net.Socket;
import com.google.gson.Gson; 

public class Client {
	
	private static final String SEVERADDRESS = "127.0.0.1" ; 
	private static final int PORT = 8189 ; 
	private static int clientNum = 1 ; 
	private static int logicNum = 1 ; 
	private static final Gson gson = new Gson() ;
	private static final int ACKseq = 1145141919 ; 
	private static final int ACKnew = 1919810 ; 
	private static final MessageQueue Mqueue = new MessageQueue ( ) ; 
	private static TextSaverApp chat = new TextSaverApp("chatgroup") ; 
	private static int ClientName ;
	private static int ACKname = 0 ; 
	
	public static void main ( String [] args ) throws Exception {
		// 定义socket,输入输出变量
		Socket socket = new Socket ( SEVERADDRESS , PORT ) ; 
		BufferedReader in = new BufferedReader ( new InputStreamReader ( socket.getInputStream() ) ) ; 
		PrintWriter out = new PrintWriter ( socket.getOutputStream() , true ) ; 
		BufferedReader consoleInput = new BufferedReader ( new InputStreamReader ( System.in ) ) ; 
		// 创造一个不断接受服务端信息的进程
		new Thread ( ( ) -> {
			try {
				String serverMessageJson ; 
				while ( ( serverMessageJson = in.readLine() ) != null ) {
					Message message = MessageUtils.deserialize(serverMessageJson) ; 
					// 如果这条信息是属于ACK信息
					// 将对应消息m的ACK+1
					// 观察队列头部的消息,如果关于该消息已经收到了所有节点的ACK消息,则将其从队列中取出,并完成投递
					if ( message.getseq() == ACKseq ) {
						//Mqueue.printAllMessages();
						Mqueue.incrementIdCount(message) ; 
						Message HeadMessage = Mqueue.getHeadMessage() ; 
						int count = Mqueue.getMessageCount(HeadMessage.getId()) ; 
						if ( count == clientNum ) {
							logicNum = logicNum + 1 ; 
							chat.setLogicNum(logicNum) ;
							Mqueue.removeMessage(HeadMessage.getId()) ; 
							//传递给应用层
							chat.appendText(ClientName + ":" + HeadMessage.toString());
							System.out.println ( HeadMessage.toString() ) ; 
						}
					}
					// 处理新来的客户端
					else if ( message.getseq() == ACKnew ) {
						logicNum = logicNum + 1 ; 
						chat.setLogicNum(logicNum) ;
						clientNum = (int)message.getId() ; 
						if ( ACKname == 0 ) {
							ACKname = 1 ; 
							ClientName = clientNum ; 
						}
						System.out.println ( clientNum ) ; 
					}
					// 如果这条信息属于发送的信息
					// 将消息按逻辑时间戳先后顺序放入缓存队列
					// 观察缓存队列头部的消息,如果该消息尚未广播过ACK消息,则向所有节点发送关于该消息的ACK信息
					else { 
						Mqueue.addMessage(message) ;
						//Mqueue.printAllMessages();
						//System.out.println(message.toString());
						Message HeadMessage = Mqueue.getHeadMessage() ; 
						int count = Mqueue.getBroadCount(HeadMessage.getId()) ;
						//System.out.println ( count ) ; 
						if ( count == 0 ) {
							// 发送ACK信息
							Mqueue.incrementBroadCount(message) ; 
							Message Hmessage = new Message ( message.getId() , ACKseq , message.getContent() )  ; 
							String messageJson = MessageUtils.serialize(Hmessage) ; 
							logicNum = Math.max(logicNum, message.getseq()) ; 
							logicNum = logicNum + 1 ; 
							chat.setLogicNum(logicNum) ; 
							out.println(messageJson);
						}
					}
				}
			} catch ( IOException e ) {
				e.printStackTrace() ; 
			}
		}).start ( ) ; 
		// 告诉服务器有新客户端加入
		Message infoM = new Message( 0 , ACKnew , "Hello") ;
		String messageJso = MessageUtils.serialize(infoM) ; 
		out.println ( messageJso ) ; 
		// 对面板输入的数据处理
		// 对终端输入的数据进行处理
		String userInput ; 
		while ( (userInput = consoleInput.readLine()) != null ) {
			Message message = new Message ( 0 , logicNum , userInput ) ;
			String messageJson = MessageUtils.serialize(message) ; 
			logicNum = logicNum + 1 ; 
			chat.setLogicNum(logicNum) ;
			out.println(messageJson);
		}
	}

}

Server.java


package p2psystem;

import java.io.* ; 
import java.net.* ; 
import java.util.* ;
import java.util.concurrent.ConcurrentHashMap;
import com.google.gson.Gson; 

public class Server {
	// 定义需要的全局变量
	private static final int PORT = 8189 ; 
	private static int messageID = 0 ; 
	private static Map<Integer,PrintWriter> clientWriters = new ConcurrentHashMap<>() ; 
	private static final Gson gson = new Gson() ; 
	private static final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator ( 1 , 1 ) ;
	private static final int ACKseq = 1145141919 ;
	private static final int ACKnew = 1919810 ;
	private static int clientNum = 0 ;
	
	public static void main ( String[] args ) {
		System.out.println ( "Central server started!") ; 
		try ( ServerSocket serverSocket = new ServerSocket ( PORT ) ) {
			int count = 0 ; 
			while ( true ) {
				new ClientHandler( serverSocket.accept() ).start() ; 
			}
		} catch ( IOException e ) {
			e.printStackTrace(); 
		}
	}
	
	private static class ClientHandler extends Thread {
		private Socket socket ; 
		private int clientID ; 
		private BufferedReader in ; 
		private PrintWriter out ; 
		
		public ClientHandler ( Socket socket ) {
			this.socket = socket ; 
		}
		
		@Override
		public void run ( ) {
			try {
				BufferedReader in = new BufferedReader ( new InputStreamReader ( socket.getInputStream() ) ) ; 
				PrintWriter out = new PrintWriter ( socket.getOutputStream() , true ) ; 
				clientID = socket.getPort() ; 
				clientWriters.put(clientID, out) ; 
				
				String messageJson ; 
				while ( (messageJson = in.readLine()) != null ) {
					// synchronized用来锁这个相同同步块的线程,因为messageID时唯一的,只能有一个来进行写操作
					synchronized ( Server.class ) {
						Message message = MessageUtils.deserialize(messageJson);
						if ( message.getId() == 0 )
							message.setId ( idGenerator.nextId() ) ; 
						if ( message.getseq() == ACKnew ) {
							message.setId((long)clientNum+(long)1);
							clientNum = clientNum + 1 ; 
						}
						String broadcastMessage = MessageUtils.serialize(message) ; 
						for ( PrintWriter writer : clientWriters.values() ) {
							writer.println ( broadcastMessage ) ; 
						}
					}
				}
			} catch ( IOException e ) {
				e.printStackTrace(); 
			} finally {
				if ( out != null ) {
					clientWriters.remove(clientID) ; 
				}
				try {
					socket.close(); 
				} catch ( IOException e ) {
					e.printStackTrace();
				}
			}
		}
	}
	
}

SnowflakeldGenerator.java


package p2psystem;

public class SnowflakeIdGenerator {
    private final long workerId;
    private final long datacenterId;

    private final long twepoch = 1288834974657L;

    private final long workerIdBits = 5L;
    private final long datacenterIdBits = 5L;
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    private final long sequenceBits = 12L;

    private final long workerIdShift = sequenceBits;
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    private long lastTimestamp = -1L;
    private long sequence = 0L;

    public SnowflakeIdGenerator(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    public synchronized long nextId() {
        long timestamp = timeGen();

        if (timestamp < lastTimestamp) {
            throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }

        lastTimestamp = timestamp;

        return ((timestamp - twepoch) << timestampLeftShift) |
                (datacenterId << datacenterIdShift) |
                (workerId << workerIdShift) |
                sequence;
    }

    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    protected long timeGen() {
        return System.currentTimeMillis();
    }
}

TextSaverApp


package p2psystem;

import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class TextSaverApp {
    private JFrame frame;
    private JTextArea textArea;
    private JTextField textField;
    private JButton sendButton;
    private BufferedWriter writer;
    
    private static final String SEVERADDRESS = "127.0.0.1" ; 
	private static final int PORT = 8189 ;
	private static int logicNum = 1 ;
	Socket socket ;
	BufferedReader in ; 
	PrintWriter out ; 
	
	public void setLogicNum ( int logicNum ) { 
		this.logicNum = logicNum ; 
	}
	
    public TextSaverApp() {
        setupUI();
        setupWriter("output.txt");
        try {
        	socket = new Socket ( SEVERADDRESS , PORT ) ; 
        	in = new BufferedReader ( new InputStreamReader ( socket.getInputStream() ) ) ; 
        	out = new PrintWriter ( socket.getOutputStream() , true ) ; 
        } catch ( IOException e ) {
        	e.getStackTrace() ; 
        }
    }

    public TextSaverApp(String filePath) {
        setupUI();
        setupWriter(filePath);
        try {
        	socket = new Socket ( SEVERADDRESS , PORT ) ; 
        	in = new BufferedReader ( new InputStreamReader ( socket.getInputStream() ) ) ; 
        	out = new PrintWriter ( socket.getOutputStream() , true ) ; 
        } catch ( IOException e ) {
        	e.getStackTrace() ; 
        }
    }

    private void setupUI() {
        frame = new JFrame("Text Saver");
        textArea = new JTextArea(20, 50);
        textField = new JTextField(40);
        sendButton = new JButton("Send");

        textArea.setEditable(false);
        JPanel panel = new JPanel();
        panel.setLayout(new BorderLayout());
        panel.add(new JScrollPane(textArea), BorderLayout.CENTER);

        JPanel inputPanel = new JPanel();
        inputPanel.add(textField);
        inputPanel.add(sendButton);

        frame.getContentPane().add(panel, BorderLayout.CENTER);
        frame.getContentPane().add(inputPanel, BorderLayout.SOUTH);

        sendButton.addActionListener(new ActionListener() {
            @Override
            public void actionPerformed(ActionEvent e) {
                sendText();
            }
        });

        textField.addActionListener(new ActionListener() {
            @Override
            public void actionPerformed(ActionEvent e) {
                sendText();
            }
        });

        frame.pack();
        frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        frame.setVisible(true);
    }

    private void setupWriter(String filePath) {
        try {
            writer = new BufferedWriter(new FileWriter(filePath, true));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void sendText() {
        String text = textField.getText();
        if (!text.isEmpty()) {
        	Message message = new Message ( 0 , logicNum , text ) ;
			String messageJson = MessageUtils.serialize(message) ; 
			logicNum = logicNum + 1 ; 
			out.println(messageJson);
			textField.setText("");
            /*textArea.append(text + "\n");
            

            try {
                writer.write(text);
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }*/
        }
    }

    public void appendText(String text) {
        if (!text.isEmpty()) {
            textArea.append(text + "\n");

            try {
                writer.write(text);
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

MessageQueue


package p2psystem;

import java.util.*;

public class MessageQueue {
    private PriorityQueue<Message> messageQueue;
    private Map<Long, Integer> idCounts;
    private Map<Long, Integer> broadCounts;

    public MessageQueue() {
        // 初始化优先队列和ID计数器
        messageQueue = new PriorityQueue<>(Comparator.comparingLong(Message::getseq));
        idCounts = new HashMap<>();
        broadCounts = new HashMap<>() ; 
    }

    // 添加消息到队列
    public void addMessage(Message message) {
        messageQueue.offer(message); // 添加到优先队列
    }
    
    public void incrementBroadCount ( Message message ) {
    	broadCounts.put(message.getId(), 1 ) ; 
    }
    
    public int getBroadCount ( long id ) { 
    	return broadCounts.getOrDefault(id, 0);
    }
    
    // 增加id的ACK计数
    public void incrementIdCount ( Message message ) {
    	idCounts.put(message.getId(), idCounts.getOrDefault(message.getId(), 0) + 1); // 增加ID计数
    }

    // 删除指定ID的消息
    public void removeMessage(long id) {
        // 创建一个新的优先队列用于存储剩余的消息
        PriorityQueue<Message> newQueue = new PriorityQueue<>(Comparator.comparingLong(Message::getseq));
        // 从原队列中移除所有指定ID的消息,并更新ID计数
        while (!messageQueue.isEmpty()) {
            Message message = messageQueue.poll();
            if (message.getId() != id) {
                newQueue.offer(message); // 将非指定ID的消息添加到新队列中
            } else {
                idCounts.put(id, 0);
            }
        }
        // 更新队列为新队列
        messageQueue = newQueue;
    }
    
    // 获取头部信息
    public Message getHeadMessage ( ) {
    	return messageQueue.peek() ; 
    }

    // 获取队列中的所有消息
    public List<Message> getAllMessages() {
        return new ArrayList<>(messageQueue);
    }

    // 获取指定ID的消息数量
    public int getMessageCount(long id) {
        return idCounts.getOrDefault(id, 0);
    }

    // 打印队列中的所有消息
    public void printAllMessages() {
        System.out.println("All Messages in the Queue:");
        for (Message message : messageQueue) {
            System.out.println(message.toString());
        }
    }
}

Message


package p2psystem;

import java.io.Serializable;

public class Message implements Serializable {
	private static final long serialVersionUID = 1L ; 
	
	private long id ; 
	private String content ; 
	private int seq ; 
	
	public Message ( long id , int seq , String content ) {
		this.id = id ; 
		this.seq = seq ; 
		this.content = content ; 
	}
	
	public long getId ( ) { return id ; } 
	public void setId ( long id ) { this.id = id ; } 
	public int getseq ( ) { return seq ; } 
	public void setseq ( int seq ) { this.seq = seq ; } 
	public String getContent ( ) { return content ; } 
	public void setContent ( String content ) { this.content = content ; } 
	
	public String toString ( ) {
		return "Message{"+
				"id="+id+
				",seq="+seq+
				",content='"+content+'\''+
				'}';
	}
}

MessageUtils


package p2psystem;

import com.google.gson.Gson;

public class MessageUtils {
	private static final Gson gson = new Gson ( ) ; 
	
	public static String serialize ( Message message ) {
		return gson.toJson(message) ; 
	}
	
	public static Message deserialize ( String json ) {
		return gson.fromJson(json, Message.class) ; 
	}
}

实现效果


当我们在文本框中输入想发送的信息后,点击Send该信息会在Client各自的显示界面中出现,对于不同文本框输入的值,我们可以发现Id号是严格递增的符合预期,Seq代表的逻辑序号也是递增的,符合我们实现的原理。可以看到在后台的命令指示符中各自的程序也都是良好的运行,符合预期!