flink sql 读取kafka 多个表 demo 详细
时间: 2023-08-09 10:05:12 浏览: 143
flink读取kafka数据.zip
以下是一个使用 Flink SQL 读取 Kafka 多个表的示例代码:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;
import java.util.Properties;
public class KafkaMultiTableDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 定义 Kafka 连接信息
String topic1 = "topic1";
String topic2 = "topic2";
String bootstrapServers = "localhost:9092";
String groupId = "group1";
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
// 定义 Kafka 表 schema
Schema schema1 = new Schema()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT());
Schema schema2 = new Schema()
.field("id", DataTypes.INT())
.field("address", DataTypes.STRING())
.field("phone", DataTypes.STRING());
// 注册 Kafka 表
tableEnv
.connect(new Kafka()
.version("universal")
.topic(topic1)
.properties(props)
.startFromLatest())
.withFormat(new Avro().recordClass(User.class))
.withSchema(schema1)
.createTemporaryTable("user");
tableEnv
.connect(new Kafka()
.version("universal")
.topic(topic2)
.properties(props)
.startFromLatest())
.withFormat(new Csv())
.withSchema(schema2)
.createTemporaryTable("contact");
// 查询多个 Kafka 表
String sql = "SELECT u.id, u.name, c.address, c.phone " +
"FROM user AS u JOIN contact AS c ON u.id = c.id";
// 执行查询
tableEnv.executeSql(sql).print();
}
public static class User {
public int id;
public String name;
public int age;
public User() {}
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
}
```
上述代码中,我们使用 Kafka 和 Avro 格式读取了名为 `topic1` 的 Kafka topic,使用 Csv 格式读取了名为 `topic2` 的 Kafka topic,并将它们注册为 `user` 和 `contact` 两个表。然后,我们通过 JOIN 操作将这两个表连接起来,并查询了它们的部分字段。最后,我们执行了整个查询并将结果打印出来。
需要注意的是,上述代码中使用了 Avro 格式读取 Kafka topic,因此需要引入对应的 Avro 依赖库。如果不需要使用 Avro 格式,可以使用其他格式(如 Csv)替换。
阅读全文