flink kafka exactly once 代码
时间: 2023-06-08 09:09:09 浏览: 173
exactly once semantics in kafka
对于Flink的Kafka消费者,可以使用以下代码实现exactly-once语义:
1. 在Flink环境中创建Kafka消费者时,需要使用 KafkaFlinkKryoSerializer 序列化器。
2. 设置ProducerRecordSemantic.EXACTLY_ONCE语义,这样Flink就会使用Kafka事务来确保数据仅被处理一次。
下面是示例代码:
```
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KafkaTuple2KryoSerializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class FlinkKafkaExactlyOnce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink-group");
kafkaProps.setProperty("enable.auto.commit", "false");
kafkaProps.setProperty("auto.offset.reset", "earliest");
kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.setProperty(FlinkKafkaProducer.TRANSACTION_TIMEOUT_TIMER_INTERVAL_MS, "600000");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input_topic", new SimpleStringSchema(), kafkaProps);
consumer.setStartFromEarliest();
consumer.setCommitOffsetsOnCheckpoints(true);
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("output_topic", new SimpleStringSchema(), producerProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
env
.addSource(consumer)
.map(String::toLowerCase)
.addSink(producer);
env.execute();
}
}
```
在此代码中,我们使用 KafkaTuple2KryoSerializer 序列化器和 ProducerRecordSemantic.EXACTLY_ONCE语义来确保消费数据和生产数据仅处理一次。同时,我们还使用 FlinkKafkaProducer.TRANSACTION_TIMEOUT_TIMER_INTERVAL_MS 属性来延长事务生命周期,以便可以增加提交事务的成功率。
阅读全文