kafka 生产者 异步发送,并返回发送结果
时间: 2023-07-19 21:46:04 浏览: 169
在C#中,可以使用`ProduceAsync`方法来异步发送消息到Kafka,然后使用`Task.Wait()`来等待消息发送完成并获取结果。以下是一个简单的示例代码:
```csharp
using Confluent.Kafka;
using System;
using System.Threading.Tasks;
public class KafkaProducer
{
private readonly IProducer<string, string> _producer;
public KafkaProducer(string bootstrapServers)
{
var config = new ProducerConfig { BootstrapServers = bootstrapServers };
_producer = new ProducerBuilder<string, string>(config).Build();
}
public async Task<DeliveryResult<string, string>> PublishAsync(string topic, string message)
{
var deliveryResult = await _producer.ProduceAsync(topic, new Message<string, string> { Key = null, Value = message });
Console.WriteLine($"Message delivered to {deliveryResult.TopicPartitionOffset}");
return deliveryResult;
}
public void Dispose()
{
_producer.Dispose();
}
}
```
在这个例子中,我们添加了一个名为 `PublishAsync` 的异步方法,该方法返回 `Task<DeliveryResult<string, string>>` 对象。在这个方法中,我们使用 `ProduceAsync` 方法来异步发送消息到 Kafka,并在消息发送完成后返回 `DeliveryResult` 对象。请注意,我们在 `PublishAsync` 方法中使用了 `await` 关键字,以便异步等待消息发送完成。
在使用时,我们可以使用以下代码示例来异步发布消息并获取结果:
```csharp
var producer = new KafkaProducer("localhost:9092");
var deliveryResult = producer.PublishAsync("test-topic", "Hello, Kafka!").Result;
producer.Dispose();
```
在这个示例中,我们使用 `PublishAsync` 方法异步发布消息,并使用 `Result` 属性等待消息发送完成并获取结果。请注意,如果消息发送失败,将会抛出 `ProduceException` 异常。
阅读全文