flink读取kafka
时间: 2023-09-05 16:10:07 浏览: 105
Flink 可以通过 Flink Kafka Consumer 来读取 Kafka 中的数据。下面是一个简单的示例:
```java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaFlinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
stream.print();
env.execute("Kafka Flink Demo");
}
}
```
在上述示例中,我们创建了一个 FlinkKafkaConsumer 对象,它从名为 "test-topic" 的 Kafka 主题中读取数据。然后,我们通过 env.addSource 方法将其添加到 Flink 流中,并最终打印出来。
阅读全文