flink使用kafka作为source的示例
时间: 2023-05-25 21:01:47 浏览: 108
A:下面是一个使用Apache Flink和Apache Kafka作为source的示例:
```java
import java.util.Properties;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class KafkaSourceExample {
public static void main(String[] args) throws Exception {
// 设置运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka参数
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties properties = parameterTool.getProperties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink_consumer_group");
// 从Kafka中读取数据
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
"topic_name", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(kafkaConsumer);
// 打印结果
stream.print();
// 开始执行程序
env.execute("Kafka Source Example");
}
}
```
在此示例中,我们创建了一个FlinkKafkaConsumer,并将其传递给addSource方法以从Kafka主题中读取数据。最后,我们通过将结果流打印到控制台来查看数据。
阅读全文