flink table 读取kafka 多个表 demo 1.15.0 maven
时间: 2024-01-03 08:02:56 浏览: 195
flink 实现将kafka的数据读取出来,并经过xml配置和重写算子方式实现数据处理
4星 · 用户满意度95%
可以使用 Flink Table API 和 Flink SQL 来读取 Kafka 中的多个表。下面是一个使用 Flink 1.15.0 版本和 Maven 构建的示例代码:
首先,在 Maven 中添加以下依赖项:
```xml
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.15.0</version>
</dependency>
</dependencies>
```
然后,可以使用以下代码来读取多个 Kafka 表:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.descriptors.*;
import java.util.Properties;
public class FlinkKafkaMultiTableDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
final String catalogName = "my_catalog";
final String databaseName = "my_database";
// 创建内存 Catalog
final GenericInMemoryCatalog catalog = new GenericInMemoryCatalog(catalogName, databaseName);
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);
// 创建 Kafka 连接器
final String kafkaTopic1 = "topic1";
final String kafkaTopic2 = "topic2";
final String bootstrapServers = "localhost:9092";
final String groupId = "my_group";
final String format = "json";
final Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", bootstrapServers);
kafkaProps.setProperty("group.id", groupId);
kafkaProps.setProperty("auto.offset.reset", "earliest");
// 创建第一个 Kafka 表
final String table1Name = "table1";
final String table1Topic = kafkaTopic1;
final String table1DDL = String.format(
"CREATE TABLE %s (%s) WITH ('connector' = 'kafka', 'topic' = '%s', 'properties.bootstrap.servers' = '%s', 'properties.group.id' = '%s', 'format' = '%s')",
table1Name,
"name STRING",
table1Topic,
bootstrapServers,
groupId,
format
);
tableEnv.executeSql(table1DDL);
// 创建第二个 Kafka 表
final String table2Name = "table2";
final String table2Topic = kafkaTopic2;
final String table2DDL = String.format(
"CREATE TABLE %s (%s) WITH ('connector' = 'kafka', 'topic' = '%s', 'properties.bootstrap.servers' = '%s', 'properties.group.id' = '%s', 'format' = '%s')",
table2Name,
"age INT, gender STRING",
table2Topic,
bootstrapServers,
groupId,
format
);
tableEnv.executeSql(table2DDL);
// 查询多个表并进行连接
final String query = String.format(
"SELECT t1.name, t2.age, t2.gender FROM %s t1 JOIN %s t2 ON t1.name = t2.gender",
table1Name,
table2Name
);
final Table result = tableEnv.sqlQuery(query);
result.printSchema();
tableEnv.toAppendStream(result, Row.class).print();
env.execute("Flink Kafka Multi-Table Demo");
}
}
```
在上面的代码中,首先创建了一个内存 Catalog,并将其注册到 TableEnvironment 中。然后,使用 FlinkKafkaConsumer 创建两个 Kafka 表,并使用 executeSql 方法将其注册到 Catalog 中。
最后,使用 SQL 查询语句将这两个表连接起来,并将结果打印出来。请注意,这里使用了 toAppendStream 方法将结果转换成了流,并打印出来。
希望这个示例代码对你有所帮助!
阅读全文