flink sql 读取kafka 多个表 demo maven
时间: 2024-03-03 19:48:59 浏览: 79
Flink实时读取Kafka数据批量聚合(定时按数量)写入Mysql.rar
5星 · 资源好评率100%
好的,下面是一个使用 Flink SQL 读取 Kafka 多个表的示例 Maven 依赖和代码:
1. Maven 依赖:
```xml
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
```
2. 代码示例:
```java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import java.util.Properties;
public class KafkaMultiTableDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 定义 Kafka Consumer 配置
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "test-group");
// 定义 FlinkKafkaConsumer
FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), kafkaProps);
FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>("topic2", new SimpleStringSchema(), kafkaProps);
// 读取 Kafka 数据流
DataStream<String> stream1 = env.addSource(consumer1);
DataStream<String> stream2 = env.addSource(consumer2);
// 定义 Schema
Schema schema1 = new Schema()
.field("id", DataType.INT())
.field("name", DataType.STRING())
.field("age", DataType.INT());
Schema schema2 = new Schema()
.field("id", DataType.INT())
.field("gender", DataType.STRING())
.field("address", DataType.STRING());
// 将数据流转换为 Table
tableEnv.connect(new Kafka()
.version("universal")
.topic("topic1")
.startFromEarliest()
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group"))
.withFormat(new Json())
.withSchema(schema1)
.createTemporaryTable("table1");
tableEnv.connect(new Kafka()
.version("universal")
.topic("topic2")
.startFromEarliest()
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group"))
.withFormat(new Json())
.withSchema(schema2)
.createTemporaryTable("table2");
// 使用 Flink SQL 查询多个表
Table result = tableEnv.sqlQuery("SELECT t1.id, t1.name, t2.gender, t2.address FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
// 输出结果
result.printSchema();
tableEnv.toRetractStream(result, FieldsDataType.ROW(FieldsDataType.FIELD("id", DataType.INT()), FieldsDataType.FIELD("name", DataType.STRING()), FieldsDataType.FIELD("gender", DataType.STRING()), FieldsDataType.FIELD("address", DataType.STRING()))).print();
env.execute();
}
}
```
注意事项:
- Flink Kafka Connector 和 Flink SQL 的版本需一致。
- Schema 的字段名称和类型需和 Kafka 中的一致。
- Flink SQL 查询多个表时,需要使用 JOIN 连接。
阅读全文