请写一个flink消费kafka的demo
时间: 2023-10-10 14:13:24 浏览: 89
以下是一个基本的Flink消费Kafka的示例代码:
```java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FlinkKafkaConsumerDemo {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置Kafka连接参数
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
//创建Kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
//从最早开始消费
consumer.setStartFromEarliest();
//将Kafka消费者添加到执行环境
DataStream<String> stream = env.addSource(consumer);
//打印输出
stream.print();
//执行任务
env.execute("Flink Kafka Consumer Demo");
}
}
```
在此示例中,我们首先创建了Flink的执行环境,然后设置了Kafka的连接参数,并创建了一个Kafka消费者。我们还指定了从最早的消息开始消费。
接下来,我们将Kafka消费者添加到执行环境中,并从中创建一个数据流。最后,我们打印输出数据流,并执行Flink任务。
请注意,此示例中的代码仅用于演示目的,并且可能需要进行修改以适应您的特定情况。
阅读全文