用c++写一个带有回调函数的rabbitmq订阅者代码
时间: 2023-03-07 14:22:57 浏览: 67
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <amqp.h>
#include <amqp_framing.h> int main(int argc, char const *argv[])
{
amqp_connection_state_t conn;
int port = 5672;
char const *hostname = "localhost";
char const *exchange;
char const *bindingkey;
char const *queue;
amqp_socket_t *socket = NULL; conn = amqp_new_connection(); socket = amqp_tcp_socket_new(conn);
if (!socket) {
printf("creating socket failed\n");
return -1;
} int status = amqp_socket_open(socket, hostname, port);
if (status) {
printf("opening socket failed\n");
return -1;
} amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"); amqp_channel_open(conn, 1); amqp_exchange_declare(conn, 1, exchange, amqp_cstring_bytes("direct"), 0, 0, 0, 0, amqp_empty_table);
amqp_queue_declare(conn, 1, queue, 0, 0, 0, 0, amqp_empty_table); amqp_queue_bind(conn, 1, queue, exchange, bindingkey, amqp_empty_table); amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, 0, 0, amqp_empty_table,
amqp_basic_consume_callback,
(void *)&callback_data); while (1) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope; amqp_maybe_release_buffers(conn); res = amqp_consume_message(conn, &envelope, NULL, 0); if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
} callback_data.message_count++; callback_data.cb(envelope.message.body.bytes, envelope.message.body.len); amqp_destroy_envelope(&envelope);
} amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}