写一个flink实时消费kafka程序
时间: 2023-08-19 16:51:59 浏览: 110
好的,以下是一个使用 Flink 实时消费 Kafka 的 Java 示例程序:
```java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 Kafka 连接参数
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
// 创建 Kafka 消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), props);
// 添加 Kafka 消费者到 Flink 程序中
DataStream<String> stream = env.addSource(consumer);
// 对接收到的数据进行处理
stream.print();
env.execute("Kafka Consumer Example");
}
}
```
以上程序会实时消费名为 `my-topic` 的 Kafka Topic 中的数据,并将其输出到控制台。你可以根据自己的需求进行修改。注意需要引入 Flink 和 Kafka 的相关依赖库。
阅读全文