flink sql单元测试代码示例
时间: 2023-07-09 21:37:16 浏览: 190
以下是一个基于 Flink SQL API 的单元测试代码示例,该代码测试了一个简单的 Flink SQL 查询:
```java
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.types.Row;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class FlinkSqlUnitTest {
@Test
public void testFlinkSql() throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 创建测试数据
env.setParallelism(1);
env.fromElements(Tuple2.of("Alice", 23), Tuple2.of("Bob", 35), Tuple2.of("Charlie", 40))
.toTable(tEnv, "name, age");
// 执行测试任务
Table result = tEnv.sqlQuery("SELECT * FROM name_age WHERE age > 30");
// 验证测试结果
tEnv.toRetractStream(result, Row.class)
.returns(Types.TUPLE(Types.BOOLEAN, Types.ROW_NAMED(new String[]{"name", "age"}, Types.STRING, Types.INT)))
.map(tuple -> tuple.f1)
.returns(Types.ROW_NAMED(new String[]{"name", "age"}, Types.STRING, Types.INT))
.addSink(new StreamSinkFunction<>());
// 执行测试任务
env.execute();
}
}
```
该代码使用 Flink 的 StreamTableEnvironment,创建了一个简单的表,然后执行了一个 Flink SQL 查询,查询年龄大于 30 的人的信息。最后,使用 Flink 的 TestingUtils 工具类来验证测试结果是否正确。
阅读全文