flink sql 读取kafka demo 详细 和maven配置
时间: 2023-08-09 15:05:12 浏览: 107
flink 实现将kafka的数据读取出来,并经过xml配置和重写算子方式实现数据处理
4星 · 用户满意度95%
以下是 Flink SQL 读取 Kafka 的 Demo 详细步骤和 Maven 配置:
1. 在 pom.xml 文件中添加以下依赖:
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
```
其中,${scala.binary.version} 表示 Scala 的二进制版本,${flink.version} 表示 Flink 的版本号。
2. 在 Flink SQL 程序中,可以通过以下方式读取 Kafka 数据:
```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 定义 Kafka 数据源
String kafkaTopic = "input-topic";
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "test-group");
// 创建 Kafka 数据源表
tableEnv.connect(new Kafka()
.version("universal")
.topic(kafkaTopic)
.properties(kafkaProps)
.startFromEarliest()
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.BIGINT())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT()))
.inAppendMode()
.createTemporaryTable("inputTable"); // 注册表名
// 查询 Kafka 数据
Table result = tableEnv.sqlQuery("SELECT id, name, age FROM inputTable WHERE age > 18");
// 输出结果到控制台
tableEnv.toRetractStream(result, Row.class).print();
// 执行程序
env.execute();
```
其中,Kafka 数据源通过 connect 方法创建,使用 withFormat 和 withSchema 方法指定数据格式和数据模式,inAppendMode 表示数据源是追加模式,createTemporaryTable 方法用于注册表名。最后通过 sqlQuery 方法查询数据,toRetractStream 方法输出结果到控制台。
以上是 Flink SQL 读取 Kafka 的 Demo 详细步骤和 Maven 配置,根据实际情况可以进行调整。
阅读全文