写一个Flink消费kafka数据的代码
时间: 2024-04-29 15:25:30 浏览: 116
以下是一个简单的 Flink 消费 Kafka 数据的代码示例:
```
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
// 初始化 Flink 执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 Kafka 配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test-group");
// 创建 FlinkKafkaConsumer 并指定要消费的 topic 和序列化方式
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic", new SimpleStringSchema(), props);
// 添加消费者到执行环境中
DataStream<String> stream = env.addSource(consumer);
// 对消费到的数据进行处理
stream.map(str -> "Received: " + str).print();
// 执行 Flink 程序
env.execute("Kafka Consumer Example");
}
}
```
这个代码示例使用 FlinkKafkaConsumer 类从 Kafka 中消费数据,并将消费到的数据打印到控制台上。在这个示例中,我们使用了简单的字符串序列化方式。在实际应用中,您可能需要根据您的数据类型选择适当的序列化方式。
阅读全文