flink 写kafka
时间: 2023-07-06 15:40:10 浏览: 122
要使用 Flink 从 Kafka 中读取数据或者将数据写入 Kafka,需要先将 Flink 的 Kafka 连接器添加到 Flink 应用程序的依赖中。可以在 Flink 的官方文档中找到详细的说明和示例代码。
以下是一个简单的 Flink 应用程序示例,它从 Kafka 主题中读取字符串消息,并将它们写入到另一个 Kafka 主题中:
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSink;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 Kafka 消费者
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "test-group");
consumerProps.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new StringDeserializer(), consumerProps);
DataStream<String> input = env.addSource(consumer);
// 数据处理
DataStream<String> output = input.map(str -> str.toUpperCase());
// 配置 Kafka 生产者
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
KafkaSerializationSchema<String> schema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<>("output-topic", element.getBytes());
}
};
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("output-topic", schema, producerProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
// 将数据写入 Kafka
output.addSink(producer);
env.execute("Kafka Example");
}
}
```
该示例中,首先配置了一个 Kafka 消费者来读取输入主题中的消息。然后对消息进行处理,将其转换成大写字母。最后,使用一个 Kafka 生产者将处理后的消息写入到输出主题中。
在实际的应用程序中,需要根据具体的需求调整配置和实现方式。同时,还需要注意 Flink 与 Kafka 版本的兼容性问题,以及应用程序的容错机制和性能优化等方面的考虑。
阅读全文