使用c/c++和UDP协议写一个图像传输库,不会丢包的,使用多线程,使用命令控制是否接收和发送
时间: 2023-11-22 07:56:33 浏览: 64
以下是一个基本的UDP图像传输库的代码示例,它使用多线程进行发送和接收,并包含命令控制选项:
```c++
#include <iostream>
#include <cstring>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#define MAX_PACKET_SIZE 65507
class ImageTransmitter {
public:
ImageTransmitter() :
m_send_socket(-1), m_receive_socket(-1),
m_send_thread(nullptr), m_receive_thread(nullptr),
m_send_stop(false), m_receive_stop(false) {}
~ImageTransmitter() {
Stop();
}
bool Init(const char* ip_address, int port) {
if (m_send_socket >= 0 || m_receive_socket >= 0) {
std::cerr << "ImageTransmitter already initialized." << std::endl;
return false;
}
// Create the send socket
m_send_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (m_send_socket < 0) {
std::cerr << "Failed to create send socket: " << strerror(errno) << std::endl;
return false;
}
// Create the receive socket
m_receive_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (m_receive_socket < 0) {
std::cerr << "Failed to create receive socket: " << strerror(errno) << std::endl;
return false;
}
// Bind the receive socket to the specified address and port
struct sockaddr_in receive_address;
memset(&receive_address, 0, sizeof(receive_address));
receive_address.sin_family = AF_INET;
receive_address.sin_addr.s_addr = htonl(INADDR_ANY);
receive_address.sin_port = htons(port);
if (bind(m_receive_socket, (struct sockaddr*)&receive_address, sizeof(receive_address)) < 0) {
std::cerr << "Failed to bind receive socket: " << strerror(errno) << std::endl;
return false;
}
// Set the send socket to non-blocking mode
int flags = fcntl(m_send_socket, F_GETFL, 0);
if (flags < 0) {
std::cerr << "Failed to get send socket flags: " << strerror(errno) << std::endl;
return false;
}
if (fcntl(m_send_socket, F_SETFL, flags | O_NONBLOCK) < 0) {
std::cerr << "Failed to set send socket to non-blocking mode: " << strerror(errno) << std::endl;
return false;
}
// Set the receive socket to non-blocking mode
flags = fcntl(m_receive_socket, F_GETFL, 0);
if (flags < 0) {
std::cerr << "Failed to get receive socket flags: " << strerror(errno) << std::endl;
return false;
}
if (fcntl(m_receive_socket, F_SETFL, flags | O_NONBLOCK) < 0) {
std::cerr << "Failed to set receive socket to non-blocking mode: " << strerror(errno) << std::endl;
return false;
}
// Set the remote address and port for the send socket
memset(&m_send_address, 0, sizeof(m_send_address));
m_send_address.sin_family = AF_INET;
m_send_address.sin_addr.s_addr = inet_addr(ip_address);
m_send_address.sin_port = htons(port);
return true;
}
void Start() {
if (!m_send_thread) {
m_send_thread = new std::thread(&ImageTransmitter::SendThreadFunc, this);
}
if (!m_receive_thread) {
m_receive_thread = new std::thread(&ImageTransmitter::ReceiveThreadFunc, this);
}
}
void Stop() {
if (m_send_thread) {
m_send_stop = true;
m_send_thread->join();
delete m_send_thread;
m_send_thread = nullptr;
}
if (m_receive_thread) {
m_receive_stop = true;
m_receive_thread->join();
delete m_receive_thread;
m_receive_thread = nullptr;
}
if (m_send_socket >= 0) {
close(m_send_socket);
m_send_socket = -1;
}
if (m_receive_socket >= 0) {
close(m_receive_socket);
m_receive_socket = -1;
}
}
void SetSendEnabled(bool enabled) {
std::unique_lock<std::mutex> lock(m_send_mutex);
m_send_enabled = enabled;
m_send_cond.notify_all();
}
void SetReceiveEnabled(bool enabled) {
std::unique_lock<std::mutex> lock(m_receive_mutex);
m_receive_enabled = enabled;
m_receive_cond.notify_all();
}
bool IsSendEnabled() const {
return m_send_enabled;
}
bool IsReceiveEnabled() const {
return m_receive_enabled;
}
private:
void SendThreadFunc() {
while (!m_send_stop) {
std::unique_lock<std::mutex> lock(m_send_mutex);
while (!m_send_enabled && !m_send_stop) {
m_send_cond.wait(lock);
}
if (m_send_stop) {
break;
}
// Get the next packet from the queue
std::vector<uint8_t> packet;
{
std::unique_lock<std::mutex> lock(m_send_queue_mutex);
if (!m_send_queue.empty()) {
packet = m_send_queue.front();
m_send_queue.pop();
}
}
// Send the packet
if (!packet.empty()) {
ssize_t sent = sendto(m_send_socket, packet.data(), packet.size(), 0,
(struct sockaddr*)&m_send_address, sizeof(m_send_address));
if (sent < 0) {
if (errno != EWOULDBLOCK) {
std::cerr << "Failed to send packet: " << strerror(errno) << std::endl;
}
}
}
}
}
void ReceiveThreadFunc() {
while (!m_receive_stop) {
std::unique_lock<std::mutex> lock(m_receive_mutex);
while (!m_receive_enabled && !m_receive_stop) {
m_receive_cond.wait(lock);
}
if (m_receive_stop) {
break;
}
// Receive a packet
uint8_t buffer[MAX_PACKET_SIZE];
struct sockaddr_in sender_address;
socklen_t sender_address_size = sizeof(sender_address);
ssize_t received = recvfrom(m_receive_socket, buffer, MAX_PACKET_SIZE, 0,
(struct sockaddr*)&sender_address, &sender_address_size);
if (received < 0) {
if (errno != EWOULDBLOCK) {
std::cerr << "Failed to receive packet: " << strerror(errno) << std::endl;
}
} else {
// Add the packet to the receive queue
std::vector<uint8_t> packet(buffer, buffer + received);
std::unique_lock<std::mutex> lock(m_receive_queue_mutex);
m_receive_queue.push(std::make_pair(packet, sender_address));
}
}
}
int m_send_socket;
int m_receive_socket;
struct sockaddr_in m_send_address;
std::thread* m_send_thread;
std::thread* m_receive_thread;
std::mutex m_send_mutex;
std::mutex m_receive_mutex;
std::condition_variable m_send_cond;
std::condition_variable m_receive_cond;
bool m_send_enabled;
bool m_receive_enabled;
std::queue<std::vector<uint8_t>> m_send_queue;
std::mutex m_send_queue_mutex;
std::queue<std::pair<std::vector<uint8_t>, struct sockaddr_in>> m_receive_queue;
std::mutex m_receive_queue_mutex;
bool m_send_stop;
bool m_receive_stop;
};
```
该库使用两个UDP套接字:一个用于发送数据,另一个用于接收数据。通过 `Init` 方法初始化套接字,并通过 `Start` 方法启动发送和接收线程。可以使用 `Stop` 方法停止发送和接收线程,并释放套接字。
可以使用 `SetSendEnabled` 和 `SetReceiveEnabled` 方法控制是否启用发送和接收。如果禁用发送,则发送线程将等待直到启用发送或停止发送。如果禁用接收,则接收线程将等待直到启用接收或停止接收。
图像数据以字节序列的形式传输。发送线程将数据分成多个数据包,并将它们添加到发送队列中。接收线程从接收队列中获取数据包,并将它们合并为完整的图像数据。由于UDP协议不保证数据包的顺序,接收线程必须对数据包进行排序和合并,以确保接收到正确的图像数据。
使用以下代码示例演示如何使用该库进行图像传输:
```c++
#include <opencv2/opencv.hpp>
int main(int argc, char** argv) {
// Initialize the image transmitter
ImageTransmitter transmitter;
if (!transmitter.Init("127.0.0.1", 1234)) {
return 1;
}
// Start the transmitter
transmitter.Start();
// Load the image
cv::Mat image = cv::imread("example.jpg");
// Convert the image to a byte vector
std::vector<uint8_t> image_data;
cv::imencode(".jpg", image, image_data);
// Send the image data
const int packet_size = 1024;
for (size_t i = 0; i < image_data.size(); i += packet_size) {
std::vector<uint8_t> packet(image_data.begin() + i, image_data.begin() + i + packet_size);
std::unique_lock<std::mutex> lock(transmitter.m_send_queue_mutex);
transmitter.m_send_queue.push(packet);
}
// Enable sending and receiving
transmitter.SetSendEnabled(true);
transmitter.SetReceiveEnabled(true);
// Wait for the image to be received
while (true) {
std::unique_lock<std::mutex> lock(transmitter.m_receive_queue_mutex);
if (!transmitter.m_receive_queue.empty()) {
std::pair<std::vector<uint8_t>, struct sockaddr_in> packet = transmitter.m_receive_queue.front();
transmitter.m_receive_queue.pop();
if (packet.first.size() == image_data.size() && memcmp(packet.first.data(), image_data.data(), image_data.size()) == 0) {
std::cout << "Image received successfully." << std::endl;
break;
}
}
}
// Disable sending and receiving
transmitter.SetSendEnabled(false);
transmitter.SetReceiveEnabled(false);
// Stop the transmitter
transmitter.Stop();
return 0;
}
```
该示例将一个图像作为字节序列发送到本地IP地址 127.0.0.1 和端口号 1234。在发送之前,它将图像数据分成多个数据包,并将它们添加到发送队列中。在启用发送和接收之后,它等待接收到完整的图像数据,并检查它是否与发送的数据相同。最后,它停止发送和接收,并释放套接字。
相关推荐
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)