flink怎样消费kafka数据
时间: 2023-08-27 19:05:00 浏览: 117
Apache Flink提供了一个Kafka Consumer API,可以用来消费Kafka数据。实现步骤如下:
1.引入依赖
在pom.xml文件中添加Flink-Kafka依赖:
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
```
2.创建Kafka Consumer
使用Flink提供的Kafka Consumer API创建一个Kafka Consumer,并指定所需的配置信息,如Kafka的Broker地址、Topic名称、序列化方式等。
```
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
```
3.将Consumer添加到Flink程序中
将创建的Kafka Consumer添加到Flink程序中,通过addSource()方法实现:
```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(consumer);
```
4.处理数据
经过上述步骤,就可以从Kafka中消费数据,并将其作为DataStream对象传递到下一个算子中进行处理。
```
stream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s.toUpperCase();
}
}).print();
```
5.启动程序
最后,通过调用execute()方法来启动Flink程序。
```
env.execute("Kafka Consumer");
```
完整示例代码:
```
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s.toUpperCase();
}
}).print();
env.execute("Kafka Consumer");
```
阅读全文