基于udp实可靠的数据传输程序,java代码,server端和client端
时间: 2024-12-14 12:20:35 浏览: 18
基于UDP协议实现可靠数据传输在Java中通常需要借助一些额外手段来补偿UDP的不可靠特性,因为UDP是无连接、非确认的,它并不保证数据包的顺序到达或丢失后的重传。一种常见的做法是使用自定义的序号和确认机制,以及超时重传策略。
服务器端(Server)Java代码示例:
```java
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
public class UDPServer {
private final DatagramSocket serverSocket;
private final ConcurrentHashMap<Long, Future<String>> pendingRequests = new ConcurrentHashMap<>();
public UDPServer(int port) throws IOException {
serverSocket = new DatagramSocket(port);
}
public void startReceiving() {
try {
byte[] buffer = new byte[1024];
while (true) {
InetSocketAddress clientAddress = (InetSocketAddress) serverSocket.receive(buffer);
long sequenceNumber = ByteBuffer.wrap(buffer).getLong();
String message = new String(buffer, StandardCharsets.UTF_8);
handleRequest(sequenceNumber, message, clientAddress);
}
} catch (IOException e) {
System.err.println("Error receiving data: " + e.getMessage());
}
}
private void handleRequest(long seqNum, String data, InetSocketAddress clientAddress) {
// 实现请求处理,例如添加到任务队列并设置超时重试逻辑
FutureTask<String> task = new FutureTask<>(new Callable<String>() {
@Override
public String call() {
// 这里可以对数据做处理,然后返回响应序列号
return "Response for Sequence Number: " + seqNum;
}
});
pendingRequests.put(seqNum, task);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(task);
// 发送应答,这里只是一个简化的例子
byte[] response = ("Response to " + data).getBytes(StandardCharsets.UTF_8);
sendResponse(response, clientAddress);
}
private void sendResponse(byte[] response, InetSocketAddress address) {
try {
DatagramPacket packet = new DatagramPacket(response, response.length, address);
serverSocket.send(packet);
} catch (IOException e) {
System.err.println("Failed to send response: " + e.getMessage());
}
}
// ...其他方法及关闭逻辑...
}
```
客户端(Client)Java代码示例:
```java
import java.io.*;
import java.net.*;
import java.nio.charset.StandardCharsets;
public class UDPClient {
private static final int REQUEST_TIMEOUT_MS = 5000; // 5秒超时
public static void main(String[] args) throws IOException, InterruptedException {
InetAddress ipAddress = InetAddress.getByName("localhost");
int port = 9999;
String message = "Hello Server";
long sequenceNumber = System.currentTimeMillis(); // 暂时用当前时间作为序号
byte[] request = ("Request: " + sequenceNumber + ", " + message).getBytes(StandardCharsets.UTF_8);
sendRequest(request, ipAddress, port);
String response = receiveResponse(ipAddress, port, sequenceNumber, REQUEST_TIMEOUT_MS);
System.out.println("Received response: " + response);
}
private static void sendRequest(byte[] request, InetAddress address, int port) {
DatagramSocket socket = new DatagramSocket();
DatagramPacket packet = new DatagramPacket(request, request.length, address, port);
socket.send(packet);
}
private static String receiveResponse(InetAddress address, int port, long expectedSequenceNumber, long timeout) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeout) {
byte[] buffer = new byte[1024];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length, address, port);
socket.receive(packet);
if (ByteBuffer.wrap(packet.getData()).getLong() == expectedSequenceNumber) {
return new String(packet.getData(), StandardCharsets.UTF_8);
}
}
throw new RuntimeException("Timeout waiting for response from the server");
}
}
```
阅读全文