kafka 幂等性生产者 代码
时间: 2023-06-30 15:17:33 浏览: 105
第11单元 KafkaApi实战1
以下是使用Kafka幂等性生产者发送消息的示例代码:
```java
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.ProducerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ProduceResponse;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaIdempotentProducer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "idempotent-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-" + i, "value-" + i);
producer.send(record);
}
producer.flush();
producer.close();
}
}
```
在上述代码中,我们创建了一个Kafka生产者对象,并将`enable.idempotence`配置项设置为`true`,表示开启Kafka生产者的幂等性。接着,我们通过`send()`方法向指定的主题发送消息。由于Kafka生产者的幂等性机制,我们可以放心地发送重复的消息,Kafka broker 会自动去重。
需要注意的是,使用Kafka幂等性生产者时,需要将`acks`配置项设置为 `all`,以保证消息被成功写入到所有副本之后才返回确认消息。同时,需要将`max.in.flight.requests.per.connection`配置项设置为1,以避免出现消息重排的情况。
阅读全文