flink1.14 消费kafka
时间: 2023-05-28 11:05:57 浏览: 287
在 Flink 1.14 中,要消费 Kafka 中的数据,需要使用 Flink 的 Kafka Connector。以下是一个简单的 Flink 1.14 消费 Kafka 数据的示例:
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure Kafka properties
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "my-group");
// create a Kafka consumer
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), kafkaProps);
// add the Kafka consumer as a source to the Flink program
DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);
// print the Kafka data to the console
kafkaDataStream.print();
// execute the Flink program
env.execute("Kafka Consumer Example");
}
}
```
在此示例中,我们首先设置了执行环境,然后配置了 Kafka 属性。接下来,我们创建了一个 FlinkKafkaConsumer 对象,并将其作为源添加到 Flink 程序中。最后,我们将 Kafka 数据打印到控制台并执行 Flink 程序。请注意,此示例中使用的是 SimpleStringSchema,这意味着数据以字符串形式传递。如果您的数据采用不同的格式,请相应地更改序列化程序。
注意:在使用 Flink 1.14 消费 Kafka 数据时,需要确保与您使用的 Kafka 版本兼容的 Flink Kafka Connector 版本。Flink 官方文档提供了有关版本兼容性的详细信息。
阅读全文
相关推荐
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![tgz](https://img-home.csdnimg.cn/images/20250102104920.png)
![rar](https://img-home.csdnimg.cn/images/20241231044955.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![tgz](https://img-home.csdnimg.cn/images/20250102104920.png)