flink消费kafka数据
时间: 2023-08-27 14:05:19 浏览: 124
在 Flink 中使用 Kafka 作为数据源可以通过 Flink 的 Kafka Consumer 进行实现。下面是一个简单的 Flink 消费 Kafka 数据的代码示例:
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FlinkKafkaConsumerExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 Kafka Consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
// 创建 FlinkKafkaConsumer
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), props);
// 将 Kafka Consumer 添加到 Flink 程序中
DataStream<String> stream = env.addSource(consumer);
// TODO: 在这里添加对数据流的处理逻辑
// 执行 Flink 程序
env.execute("Flink Kafka Consumer Example");
}
}
```
上述代码中,我们首先使用 `StreamExecutionEnvironment` 创建执行环境。然后,我们设置 Kafka Consumer 的配置信息,并使用 `FlinkKafkaConsumer` 创建一个 Kafka Consumer 对象。最后,我们将 Kafka Consumer 添加到 Flink 程序中,使用 `env.addSource()` 方法将 Kafka 数据流转换为 Flink 数据流。接下来,我们可以在 `TODO` 标记处添加对数据流的处理逻辑。最后,我们调用 `env.execute()` 方法来执行 Flink 程序。
阅读全文