帮我写一个flink连接kafka得代码
时间: 2024-05-01 17:16:56 浏览: 9
以下是一个简单的Flink连接Kafka的代码示例:
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class FlinkKafkaDemo {
public static void main(String[] args) throws Exception {
// 设置Flink流式处理环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka配置
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-kafka-demo");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建FlinkKafkaConsumer
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
// 将Kafka数据源添加到Flink流中
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
// 打印Kafka数据源中的数据
kafkaStream.print();
// 执行Flink程序
env.execute("Flink Kafka Demo");
}
}
```
在以上代码中,我们首先设置了Flink的流式处理环境,然后设置Kafka的相关配置,并创建了一个FlinkKafkaConsumer。接着,我们将Kafka数据源添加到Flink流中,并打印出来。最后,我们执行Flink程序。
需要注意的是,以上代码中使用了SimpleStringSchema来序列化和反序列化Kafka中的数据,你可以根据实际情况选择其他的Schema。此外,你还需要根据实际情况修改Kafka的相关配置。