C#实现rabbitmq六大模式辅助类生产者和消费者代码示例并包括消费队列包含确认回复后才队列才消除
时间: 2024-02-05 22:13:04 浏览: 134
以下是C#实现rabbitmq六大模式辅助类生产者和消费者代码示例,包括消费队列包含确认回复后才队列才消除的部分:
生产者代码:
```csharp
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
public class RabbitMQProducer
{
private readonly string _exchangeName;
private readonly string _routingKey;
private readonly string _queueName;
private readonly IConnectionFactory _connectionFactory;
public RabbitMQProducer(string exchangeName, string routingKey, string queueName, IConnectionFactory connectionFactory)
{
_exchangeName = exchangeName;
_routingKey = routingKey;
_queueName = queueName;
_connectionFactory = connectionFactory;
}
public void Publish(string message)
{
using (var connection = _connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(_exchangeName, ExchangeType.Direct, durable: true);
channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(_exchangeName, _routingKey, properties, body);
}
}
}
```
消费者代码:
```csharp
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
public class RabbitMQConsumer
{
private readonly string _queueName;
private readonly IConnectionFactory _connectionFactory;
public RabbitMQConsumer(string queueName, IConnectionFactory connectionFactory)
{
_queueName = queueName;
_connectionFactory = connectionFactory;
}
public void Consume(Action<string> onMessageReceived)
{
using (var connection = _connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
onMessageReceived(message);
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(_queueName, autoAck: false, consumer: consumer);
}
}
}
```
消费队列包含确认回复后才队列才消除的部分代码:
```csharp
channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
{
{ "x-message-ttl", 60000 }, // 设置消息过期时间为1分钟
{ "x-dead-letter-exchange", "dead-letter-exchange" }, // 消息过期后进入的交换机
{ "x-dead-letter-routing-key", "dead-letter-queue" } // 消息过期后进入的队列
});
channel.BasicConsume(_queueName, autoAck: false, consumer: consumer);
```
在消费者的Received事件中添加判断消息是否过期:
```csharp
if (ea.BasicProperties.Expiration != null && DateTime.UtcNow > ea.BasicProperties.Timestamp.AddMilliseconds(Convert.ToDouble(ea.BasicProperties.Expiration)))
{
channel.BasicReject(ea.DeliveryTag, requeue: false); // 消息过期,拒绝并且不重新入队
}
else
{
onMessageReceived(message);
channel.BasicAck(ea.DeliveryTag, multiple: false);
}
```
阅读全文