C#实现rabbitmq六大模式辅助类写出一个包括消费队列包含确认回复后才队列才消除
时间: 2024-02-05 13:12:59 浏览: 22
以下是一个基本的 RabbitMQ 辅助类,它可以实现 RabbitMQ 的六种模式,并包含确认回复后才队列才消除的消费队列:
```csharp
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitMQHelper
{
public class RabbitMQHelper
{
private IConnection _connection;
private IModel _channel;
public RabbitMQHelper(string hostName, string userName, string password)
{
var factory = new ConnectionFactory
{
HostName = hostName,
UserName = userName,
Password = password,
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
public void Send(string queueName, string message)
{
_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(message);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
_channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
}
public void Receive(string queueName, Action<string> processMessage)
{
_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
processMessage(message);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
public void PublishSubscribeSend(string exchangeName, string message)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
}
public void PublishSubscribeReceive(string exchangeName, Action<string> processMessage)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
processMessage(message);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
public void RoutingSend(string exchangeName, string routingKey, string message)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
}
public void RoutingReceive(string exchangeName, string routingKey, Action<string> processMessage)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
processMessage(message);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
public void TopicSend(string exchangeName, string routingKey, string message)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
}
public void TopicReceive(string exchangeName, string routingKey, Action<string> processMessage)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
processMessage(message);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
public void HeadersSend(string exchangeName, string message, string headerKey, string headerValue)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Headers);
var body = Encoding.UTF8.GetBytes(message);
var headers = new Dictionary<string, object>();
headers.Add(headerKey, headerValue);
var properties = _channel.CreateBasicProperties();
properties.Headers = headers;
_channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: properties, body: body);
}
public void HeadersReceive(string exchangeName, string headerKey, string headerValue, Action<string> processMessage)
{
_channel.ExchangeDeclare(exchangeName, ExchangeType.Headers);
var queueName = _channel.QueueDeclare().QueueName;
var headers = new Dictionary<string, object>();
headers.Add(headerKey, headerValue);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "", arguments: headers);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
processMessage(message);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
public void Close()
{
_channel.Close();
_connection.Close();
}
}
}
```
在消费队列中,添加了确认回复后队列才消除的代码:
```csharp
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
```
这段代码会在消费者接收到消息后,向 RabbitMQ 服务器发送确认回复。如果在处理消息的过程中出现异常,可以使用 `_channel.BasicNack()` 方法发送一个否定的确认回复,告诉 RabbitMQ 服务器该消息未被正确处理,需要重新发送。