java flink读取kafka
时间: 2023-09-05 14:09:55 浏览: 96
要使用 Apache Flink 读取 Kafka 数据,你可以按照以下步骤进行操作:
1. 首先需要在 pom.xml 中添加 flink-connector-kafka 依赖:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
```
2. 在 Flink 程序中创建 Kafka 数据源:
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaSourceExample {
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set up the properties for the Kafka consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
// create a Kafka data source
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), props);
// add the Kafka data source to the execution environment
DataStream<String> stream = env.addSource(consumer);
// print the data stream
stream.print();
// execute the Flink program
env.execute("Kafka Source Example");
}
}
```
在上面的示例中,我们使用 `FlinkKafkaConsumer` 创建 Kafka 数据源。`FlinkKafkaConsumer` 需要传入三个参数:要消费的 topic 名称、序列化/反序列化器和 Kafka 消费者属性。
3. 运行 Flink 程序:
在运行 Flink 程序之前,需要确保已经启动了 Kafka 服务,并且已经创建了要消费的 topic。
```bash
$ bin/flink run /path/to/your/program.jar
```
运行 Flink 程序后,它会从 Kafka 中读取数据并打印到控制台。
阅读全文