基于Flink获取kafka数据
时间: 2024-01-13 20:05:13 浏览: 84
基于flink的推荐系统,实时获取kafka数据进行数据清洗,离线计算进行文件读取(文件,mongodb,hbase)
可以使用 Flink 的 Kafka Consumer API 来获取 Kafka 中的数据。下面是一个基本的示例代码:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class FlinkKafkaConsumerExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), props);
env.addSource(consumer).print();
env.execute("Flink Kafka Consumer Example");
}
}
```
在上面的代码中,我们使用 FlinkKafkaConsumer 来创建一个 Kafka 消费者。我们需要指定 Kafka 服务器的地址和端口,以及我们要消费的主题。然后,我们将消费者作为数据源添加到 Flink 的执行环境中,并打印消费到的数据。最后,我们调用 `execute` 方法来执行 Flink 作业。
阅读全文