rabbitmqtopic模式c++
时间: 2023-11-23 10:45:59 浏览: 29
使用 RabbitMQ 实现 Topic 模式需要使用 AMQP 协议的 C++ 客户端库,可以使用 RabbitMQ-C++ 客户端库来实现。
首先需要安装 RabbitMQ-C++ 客户端库,可以通过以下命令进行安装:
```
sudo apt-get install librabbitmq-dev
```
然后可以使用以下代码来实现 Topic 模式的发送和接收:
```cpp
#include <iostream>
#include <string>
#include <cstring>
#include <amqp.h>
#include <amqp_tcp_socket.h>
int main(int argc, char *argv[]) {
const char *hostname = "localhost";
int port = 5672;
const char *username = "guest";
const char *password = "guest";
const char *exchange = "topic_logs";
const char *routing_key = "example.key";
const char *message = "Hello, world!";
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
std::cerr << "Error creating TCP socket" << std::endl;
return 1;
}
int status = amqp_socket_open(socket, hostname, port);
if (status) {
std::cerr << "Error opening TCP socket" << std::endl;
return 1;
}
amqp_rpc_reply_t login_reply = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password);
if (login_reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Error logging in" << std::endl;
return 1;
}
amqp_channel_open(conn, 1);
amqp_rpc_reply_t channel_reply = amqp_get_rpc_reply(conn);
if (channel_reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Error opening channel" << std::endl;
return 1;
}
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes("topic"), 0, 0, 0, 0, amqp_empty_table);
amqp_rpc_reply_t exchange_reply = amqp_get_rpc_reply(conn);
if (exchange_reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Error declaring exchange" << std::endl;
return 1;
}
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG | AMQP_BASIC_TIMESTAMP_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; // persistent delivery mode
props.reply_to = amqp_cstring_bytes("amq.rabbitmq.reply-to");
props.correlation_id = amqp_cstring_bytes("1");
props.timestamp = amqp_cstring_bytes("2022-01-01T00:00:00.000Z");
amqp_bytes_t message_bytes = amqp_cstring_bytes(message);
amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routing_key), 0, 0, &props, message_bytes);
amqp_rpc_reply_t publish_reply = amqp_get_rpc_reply(conn);
if (publish_reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Error publishing message" << std::endl;
return 1;
}
amqp_basic_consume(conn, 1, amqp_cstring_bytes("queue"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
amqp_rpc_reply_t consume_reply = amqp_get_rpc_reply(conn);
if (consume_reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Error consuming message" << std::endl;
return 1;
}
while (1) {
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
amqp_rpc_reply_t ret = amqp_consume_message(conn, &envelope, NULL, 0);
if (ret.reply_type == AMQP_RESPONSE_NORMAL) {
std::string message((char *) envelope.message.body.bytes, (char *) envelope.message.body.bytes + envelope.message.body.len);
std::cout << "Received message: " << message << std::endl;
amqp_destroy_envelope(&envelope);
}
}
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
```
在该代码中,首先创建了一个 AMQP 连接和一个 TCP Socket。然后使用 `amqp_login` 函数进行用户登录和身份验证。接下来打开一个 AMQP channel,并使用 `amqp_exchange_declare` 函数声明 Topic 类型的 exchange。然后使用 `amqp_basic_publish` 函数发布一条消息到 exchange,并使用 `amqp_basic_consume` 函数订阅一个 queue。最后使用 `amqp_consume_message` 函数接收和处理消息。
需要注意的是,Topic 模式的 routing_key 需要使用通配符来匹配不同的消息。可以使用 `*` 匹配一个单词,或使用 `#` 匹配零个或多个单词。例如,`example.*` 可以匹配 `example.key1` 和 `example.key2`,`example.#` 可以匹配 `example.key1`、`example.key2` 和 `example.key3.key4` 等。