请用java完成在UDP基础上应用层实现可靠数据传输,要求按照rdt模型,看你实现到那个版本,最好能达到tcp的功能,显示完整代码
时间: 2024-06-09 10:04:28 浏览: 162
好的,以下是基于UDP的可靠数据传输的Java代码,实现了rdt3.0版本的功能:
Sender.java:
```java
import java.io.IOException;
import java.net.*;
public class Sender {
private static final int PORT = 1234; // 发送方和接收方的端口号都为1234
private static final String ADDRESS = "localhost"; // 发送到本地
private static final int MAX_SIZE = 1024; // 最大发送数据长度
private static final int WINDOW_SIZE = 4; // 窗口大小
private static final int TIMEOUT = 1000; // 超时时间
private DatagramSocket socket; // UDP套接字
private InetAddress receiverAddress; // 接收方的IP地址
private int receiverPort; // 接收方的端口号
private int base; // 窗口的基序号
private int nextSeq; // 下一个可用的序号
private int unacknowledged; // 最早未确认的序号
private Packet[] packets; // 存放已发送但未确认的数据包
private boolean[] acks; // 存放已收到的确认信息
private boolean finished; // 是否完成所有数据包的发送
public Sender() throws UnknownHostException, SocketException {
socket = new DatagramSocket();
receiverAddress = InetAddress.getByName(ADDRESS);
receiverPort = PORT;
base = 0;
nextSeq = 0;
unacknowledged = 0;
packets = new Packet[WINDOW_SIZE];
acks = new boolean[WINDOW_SIZE];
finished = false;
}
public void start() throws IOException {
// 启动接收ACK的线程
new Thread(new AckReceiverThread()).start();
// 发送数据包
while (!finished) {
// 发送窗口内的数据包
for (int i = base; i < nextSeq && i < base + WINDOW_SIZE; i++) {
if (!acks[i % WINDOW_SIZE]) {
sendPacket(i);
}
}
// 检查超时的数据包
for (int i = base; i < nextSeq && i < base + WINDOW_SIZE; i++) {
if (!acks[i % WINDOW_SIZE] && packets[i % WINDOW_SIZE] != null && System.currentTimeMillis() - packets[i % WINDOW_SIZE].getTimeStamp() > TIMEOUT) {
sendPacket(i);
}
}
// 检查是否完成所有数据包的发送
if (base == nextSeq && finished) {
break;
}
}
System.out.println("All packets have been sent successfully.");
}
private void sendPacket(int seq) throws IOException {
Packet packet;
if (seq == nextSeq) {
// 发送新的数据包
byte[] data = generateDataPacket();
packet = new Packet(seq, data);
packets[seq % WINDOW_SIZE] = packet;
nextSeq++;
if (nextSeq == MAX_SIZE) {
finished = true;
}
} else {
// 重传超时的数据包
packet = packets[seq % WINDOW_SIZE];
}
byte[] buffer = packet.toBytes();
DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length, receiverAddress, receiverPort);
socket.send(datagramPacket);
System.out.println("Sent packet " + packet.getSeqNum() + " successfully.");
packet.setTimeStamp(System.currentTimeMillis());
}
private byte[] generateDataPacket() {
// 生成随机数据
byte[] data = new byte[MAX_SIZE - Packet.HEADER_SIZE];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) (Math.random() * 256 - 128);
}
return data;
}
private class AckReceiverThread implements Runnable {
@Override
public void run() {
DatagramPacket datagramPacket = new DatagramPacket(new byte[MAX_SIZE], MAX_SIZE);
while (true) {
try {
socket.receive(datagramPacket);
Packet ackPacket = Packet.fromBytes(datagramPacket.getData());
// 校验ACK的校验和,丢弃损坏的ACK
if (!ackPacket.isChecksumCorrect()) {
System.out.println("Received a corrupted ACK packet.");
continue;
}
int ack = ackPacket.getAckNum();
System.out.println("Received ACK " + ack + ".");
// 更新已确认的数据包
if (ack >= unacknowledged) {
for (int i = unacknowledged; i <= ack; i++) {
int index = i % WINDOW_SIZE;
if (packets[index] != null) {
packets[index] = null;
acks[index] = true;
}
}
unacknowledged = ack + 1;
base = unacknowledged;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
```
Receiver.java:
```java
import java.io.IOException;
import java.net.*;
public class Receiver {
private static final int PORT = 1234; // 发送方和接收方的端口号都为1234
private static final int MAX_SIZE = 1024; // 最大接收数据长度
private static final int WINDOW_SIZE = 4; // 窗口大小
private DatagramSocket socket; // UDP套接字
private InetAddress senderAddress; // 发送方的IP地址
private int senderPort; // 发送方的端口号
private int expectedSeq; // 下一个期望接收的序号
private boolean[] received; // 存放已接收到的数据包
private boolean finished; // 是否完成所有数据包的接收
public Receiver() throws SocketException {
socket = new DatagramSocket(PORT);
expectedSeq = 0;
received = new boolean[MAX_SIZE];
finished = false;
}
public void start() throws IOException {
while (!finished) {
DatagramPacket datagramPacket = new DatagramPacket(new byte[MAX_SIZE], MAX_SIZE);
socket.receive(datagramPacket);
Packet packet = Packet.fromBytes(datagramPacket.getData());
// 校验数据包的校验和,丢弃损坏的数据包
if (!packet.isChecksumCorrect()) {
System.out.println("Received a corrupted packet " + packet.getSeqNum() + ".");
continue;
}
int seq = packet.getSeqNum();
if (seq == expectedSeq) {
// 接收到期望接收的数据包
storePacket(packet);
sendAck(seq);
expectedSeq++;
if (seq == MAX_SIZE - 1) {
finished = true;
}
} else if (seq > expectedSeq) {
// 接收到序号较大的数据包,但不是期望接收的
storePacket(packet);
} else {
// 接收到序号较小的数据包,已经被确认过了,忽略
System.out.println("Received an old packet " + seq + ".");
}
}
System.out.println("All packets have been received successfully.");
}
private void storePacket(Packet packet) {
int seq = packet.getSeqNum();
byte[] data = packet.getData();
System.arraycopy(data, 0, received, seq, data.length);
System.out.println("Received packet " + seq + " successfully.");
}
private void sendAck(int ack) throws IOException {
Packet ackPacket = new Packet(ack);
byte[] buffer = ackPacket.toBytes();
DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length, senderAddress, senderPort);
socket.send(datagramPacket);
System.out.println("Sent ACK " + ack + " successfully.");
}
public void setSenderAddress(InetAddress senderAddress) {
this.senderAddress = senderAddress;
}
public void setSenderPort(int senderPort) {
this.senderPort = senderPort;
}
}
```
Packet.java:
```java
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
public class Packet {
public static final int HEADER_SIZE = 12;
private static final int CRC_OFFSET = 8;
private int seqNum; // 序列号
private int ackNum; // 确认号
private byte[] data; // 数据
private long checksum; // 校验和
private long timeStamp; // 发送或接收时间戳
public Packet(int seqNum, byte[] data) {
this.seqNum = seqNum;
this.data = data;
calculateChecksum();
}
public Packet(int ackNum) {
this.ackNum = ackNum;
calculateChecksum();
}
public static Packet fromBytes(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int seqNum = buffer.getInt();
int ackNum = buffer.getInt();
byte[] data = new byte[buffer.remaining() - HEADER_SIZE];
buffer.get(data);
long checksum = buffer.getLong();
long timeStamp = buffer.getLong();
Packet packet = new Packet(seqNum, data);
packet.setAckNum(ackNum);
packet.setChecksum(checksum);
packet.setTimeStamp(timeStamp);
return packet;
}
public byte[] toBytes() {
ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE + data.length);
buffer.putInt(seqNum);
buffer.putInt(ackNum);
buffer.put(data);
buffer.putLong(checksum);
buffer.putLong(timeStamp);
return buffer.array();
}
public boolean isChecksumCorrect() {
CRC32 crc32 = new CRC32();
crc32.update(toBytes(), 0, CRC_OFFSET);
crc32.update(toBytes(), CRC_OFFSET + 8, HEADER_SIZE - CRC_OFFSET);
return crc32.getValue() == checksum;
}
private void calculateChecksum() {
CRC32 crc32 = new CRC32();
crc32.update(toBytes(), 0, CRC_OFFSET);
crc32.update(toBytes(), CRC_OFFSET + 8, HEADER_SIZE - CRC_OFFSET);
crc32.update(data);
checksum = crc32.getValue();
}
public int getSeqNum() {
return seqNum;
}
public void setSeqNum(int seqNum) {
this.seqNum = seqNum;
}
public int getAckNum() {
return ackNum;
}
public void setAckNum(int ackNum) {
this.ackNum = ackNum;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public long getChecksum() {
return checksum;
}
public void setChecksum(long checksum) {
this.checksum = checksum;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
}
```
阅读全文