C++给我写一个服务端客户端按需发布订阅代码
时间: 2024-03-05 16:50:10 浏览: 119
以下是一个简单的 C++ 代码示例,实现基于发布订阅模式的服务端和客户端:
服务端代码:
```cpp
#include <iostream>
#include <vector>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib")
// 服务端监听的端口号
const int PORT = 8888;
// 最大连接数
const int MAX_CONN = 5;
// 服务端接收消息的缓冲区大小
const int BUFFER_SIZE = 1024;
// 发布-订阅模式中的“主题”
std::string topic;
// 已连接的客户端列表
std::vector<SOCKET> clients;
// 客户端消息队列
std::vector<std::string> message_queue;
// 互斥锁和条件变量,用于线程同步
std::mutex mtx;
std::condition_variable cv;
// 线程函数,用于接收客户端消息并广播给所有已连接的客户端
void broadcast_messages()
{
while (true)
{
// 等待条件变量被唤醒
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !message_queue.empty(); });
// 从消息队列中取出一条消息
std::string message = message_queue.front();
message_queue.erase(message_queue.begin());
// 广播消息给所有已连接的客户端
for (auto client : clients)
{
send(client, message.c_str(), message.size(), 0);
}
}
}
int main()
{
// 初始化 Winsock 库
WSADATA wsaData;
int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (ret != 0)
{
std::cerr << "WSAStartup failed with error: " << ret << std::endl;
return -1;
}
// 创建一个 TCP socket 对象
SOCKET server_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (server_socket == INVALID_SOCKET)
{
std::cerr << "Failed to create server socket with error: " << WSAGetLastError() << std::endl;
WSACleanup();
return -1;
}
// 绑定 IP 地址和端口号
sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
ret = bind(server_socket, (sockaddr*)&server_addr, sizeof(server_addr));
if (ret == SOCKET_ERROR)
{
std::cerr << "Failed to bind server socket with error: " << WSAGetLastError() << std::endl;
closesocket(server_socket);
WSACleanup();
return -1;
}
// 开始监听,等待客户端连接
ret = listen(server_socket, MAX_CONN);
if (ret == SOCKET_ERROR)
{
std::cerr << "Failed to listen on server socket with error: " << WSAGetLastError() << std::endl;
closesocket(server_socket);
WSACleanup();
return -1;
}
std::cout << "Server started listening on port " << PORT << std::endl;
// 创建一个线程,用于接收客户端消息并广播给所有已连接的客户端
std::thread broadcast_thread(broadcast_messages);
while (true)
{
// 接受客户端连接请求
sockaddr_in client_addr;
int addr_len = sizeof(client_addr);
SOCKET client_socket = accept(server_socket, (sockaddr*)&client_addr, &addr_len);
if (client_socket == INVALID_SOCKET)
{
std::cerr << "Failed to accept client connection with error: " << WSAGetLastError() << std::endl;
continue;
}
std::cout << "Accepted a client connection from " << inet_ntoa(client_addr.sin_addr) << std::endl;
// 服务端向客户端发送欢迎消息
std::string welcome_message = "Welcome to the server!";
send(client_socket, welcome_message.c_str(), welcome_message.size(), 0);
// 将客户端加入已连接客户端列表
clients.push_back(client_socket);
// 循环接收客户端消息
while (true)
{
// 接收客户端消息
char buffer[BUFFER_SIZE] = { 0 };
int recv_len = recv(client_socket, buffer, BUFFER_SIZE, 0);
if (recv_len == SOCKET_ERROR)
{
std::cerr << "Failed to receive data from client with error: " << WSAGetLastError() << std::endl;
break;
}
else if (recv_len == 0)
{
std::cout << "Client disconnected." << std::endl;
break;
}
// 打印客户端消息
std::cout << "Message from client: " << buffer << std::endl;
// 将客户端消息加入消息队列
std::string message = "[" + topic + "] " + buffer;
message_queue.push_back(message);
// 唤醒线程,广播消息给所有已连接的客户端
cv.notify_all();
}
// 从已连接客户端列表中删除该客户端
clients.erase(std::remove(clients.begin(), clients.end(), client_socket), clients.end());
// 关闭客户端连接
closesocket(client_socket);
}
// 关闭服务端 socket
closesocket(server_socket);
// 等待线程结束
broadcast_thread.join();
// 清理 Winsock 库
WSACleanup();
return 0;
}
```
客户端代码:
```cpp
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib")
// 服务端 IP 地址和端口号
const char* SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 8888;
// 客户端发送消息的缓冲区大小
const int BUFFER_SIZE = 1024;
// 发布-订阅模式中的“主题”
std::string topic;
// 互斥锁和条件变量,用于线程同步
std::mutex mtx;
std::condition_variable cv;
// 线程函数,用于接收服务端广播的消息
void receive_messages(SOCKET client_socket)
{
while (true)
{
// 接收服务端广播的消息
char buffer[BUFFER_SIZE] = { 0 };
int recv_len = recv(client_socket, buffer, BUFFER_SIZE, 0);
if (recv_len == SOCKET_ERROR)
{
std::cerr << "Failed to receive data from server with error: " << WSAGetLastError() << std::endl;
break;
}
else if (recv_len == 0)
{
std::cout << "Server disconnected." << std::endl;
break;
}
// 打印收到的消息
std::cout << "Message from server: " << buffer << std::endl;
}
}
int main()
{
// 初始化 Winsock 库
WSADATA wsaData;
int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (ret != 0)
{
std::cerr << "WSAStartup failed with error: " << ret << std::endl;
return -1;
}
// 创建一个 TCP socket 对象
SOCKET client_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (client_socket == INVALID_SOCKET)
{
std::cerr << "Failed to create client socket with error: " << WSAGetLastError() << std::endl;
WSACleanup();
return -1;
}
// 连接服务端
sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(SERVER_IP);
server_addr.sin_port = htons(SERVER_PORT);
ret = connect(client_socket, (sockaddr*)&server_addr, sizeof(server_addr));
if (ret == SOCKET_ERROR)
{
std::cerr << "Failed to connect to server with error: " << WSAGetLastError() << std::endl;
closesocket(client_socket);
WSACleanup();
return -1;
}
std::cout << "Connected to server " << SERVER_IP << ":" << SERVER_PORT << std::endl;
// 接收服务端欢迎消息
char buffer[BUFFER_SIZE] = { 0 };
int recv_len = recv(client_socket, buffer, BUFFER_SIZE, 0);
if (recv_len == SOCKET_ERROR)
{
std::cerr << "Failed to receive data from server with error: " << WSAGetLastError() << std::endl;
closesocket(client_socket);
WSACleanup();
return -1;
}
else if (recv_len == 0)
{
std::cout << "Server disconnected." << std::endl;
closesocket(client_socket);
WSACleanup();
return -1;
}
// 打印服务端欢迎消息
std::cout << "Message from server: " << buffer << std::endl;
// 循环发送消息
while (true)
{
// 从控制台读取用户输入的消息
std::cout << "请输入消息:";
std::string message;
std::getline(std::cin, message);
// 如果用户输入 quit,则退出循环
if (message == "quit")
{
break;
}
// 将消息发送给服务端
std::string full_message = "[" + topic + "] " + message;
send(client_socket, full_message.c_str(), full_message.size(), 0);
// 创建一个线程,用于接收服务端广播的消息
std::thread receive_thread(receive_messages, client_socket);
// 等待线程结束
receive_thread.join();
}
// 关闭客户端 socket
closesocket(client_socket);
// 清理 Winsock 库
WSACleanup();
return 0;
}
```
此代码实现了一个简单的发布订阅模式,服务端接收客户端消息后会将其广播给所有已连接的客户端。如果您需要更灵活的订阅机制,可以使用一些成熟的消息队列中间件,例如 RabbitMQ 或 Apache Kafka。
阅读全文