用java写一个用KafkaProducer发送消息,并保证at least once的代码
时间: 2024-05-07 20:22:30 浏览: 14
下面是Java代码示例,使用KafkaProducer发送消息并保证at least once:
```java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerExample {
private final static String TOPIC_NAME = "test-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Configure the producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Ensure at least once delivery
props.put(ProducerConfig.RETRIES_CONFIG, 3); // Number of retries
Producer<String, String> producer = new KafkaProducer<>(props);
// Create and send a message
String messageValue = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
try {
RecordMetadata metadata = producer.send(record).get(); // Wait for the message to be acknowledged
System.out.printf("Message sent to partition %d with offset %d\n", metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
System.err.println("Error sending message: " + e.getMessage());
}
producer.close();
}
}
```
在上面的代码中,我们指定了以下属性:
- `BOOTSTRAP_SERVERS_CONFIG`:指定Kafka集群的地址
- `KEY_SERIALIZER_CLASS_CONFIG` 和 `VALUE_SERIALIZER_CLASS_CONFIG`:指定消息键和值的序列化器
- `ACKS_CONFIG`:确保至少一次传递消息
- `RETRIES_CONFIG`:指定重试次数
然后,我们创建一个`KafkaProducer`实例,并使用`send`方法发送消息。我们使用`get`方法等待消息被确认,以确保消息至少传递一次。最后,我们关闭生产者。
相关推荐
![java](https://img-home.csdnimg.cn/images/20210720083646.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)