用flink datastream 写一个程序,分析一个mysql表 写入另一个MySQL表,每隔1min执行一次
时间: 2023-12-14 13:40:23 浏览: 257
可以按照以下步骤进行:
1. 引入必要的依赖
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
```
其中,`${flink.version}` 和 `${mysql.version}` 分别为 Flink 和 MySQL 驱动的版本号,可以根据实际情况进行修改。
2. 编写 Flink 程序
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.jdbc.JdbcConnectionOptions;
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcSource;
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.connectors.jdbc.JdbcTableSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcTableSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.Jdbc;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class FlinkMySQLJob {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取数据源
JdbcTableSource source = JdbcTableSource.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/test")
.setUsername("root")
.setPassword("123456")
.setQuery("select * from user")
.setRowtimeAttribute("update_time", new AscendingTimestampExtractor<Row>() {
@Override
public long extractAscendingTimestamp(Row row) {
return ((Timestamp) row.getField(3)).getTime();
}
})
.build();
// 注册数据源
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerTableSource("user", source);
// 定义输出表结构
List<String> fieldNames = new ArrayList<>();
fieldNames.add("id");
fieldNames.add("name");
fieldNames.add("age");
fieldNames.add("update_time");
List<DataTypes> fieldTypes = new ArrayList<>();
fieldTypes.add(DataTypes.INT());
fieldTypes.add(DataTypes.STRING());
fieldTypes.add(DataTypes.INT());
fieldTypes.add(DataTypes.TIMESTAMP());
// 定义输出连接器
JdbcTableSink sink = JdbcTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/test")
.setUsername("root")
.setPassword("123456")
.setTableName("user_copy")
.setBatchSize(1000)
.build();
// 定义输出表
Schema schema = new Schema()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.field("update_time", DataTypes.TIMESTAMP());
tableEnv.connect(new ConnectorDescriptor("jdbc", new Jdbc())
.property("driver", "com.mysql.jdbc.Driver")
.property("url", "jdbc:mysql://localhost:3306/test")
.property("username", "root")
.property("password", "123456"))
.withSchema(schema)
.withFormat(new org.apache.flink.table.formats.Json())
.createTemporaryTable("user_copy");
// 执行 SQL 并写入结果到输出表
Table table = tableEnv.sqlQuery("select id, name, age, update_time from user");
tableEnv.toRetractStream(table, Row.class).addSink(sink);
// 执行程序
env.execute("Flink MySQL Job");
}
}
```
上述代码中,我们首先通过 `JdbcTableSource` 获取 MySQL 数据源,并将其注册为 Flink 表 `user`。然后我们定义了输出表结构和连接器,并使用 `JdbcTableSink` 将结果写入 MySQL 表 `user_copy`。最后,我们通过 `tableEnv.toRetractStream` 方法将查询结果转化为 `DataStream<Row>`,并将其写入输出表。
3. 配置定时执行任务
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class FlinkMySQLJob {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// ...
// 定时执行任务
env.addSource(new SourceFunction<Long>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (isRunning) {
ctx.collect(System.currentTimeMillis());
TimeUnit.MINUTES.sleep(1);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).addSink(new SinkFunction<Long>() {
@Override
public void invoke(Long value, Context context) throws Exception {
tableEnv.sqlUpdate("insert into user_copy select id, name, age, update_time from user");
}
});
// 执行程序
env.execute("Flink MySQL Job");
}
}
```
在上述代码中,我们使用 `env.addSource` 方法定义一个源,每隔 1 分钟发射一个时间戳。然后我们使用 `env.addSink` 方法在每个时间戳触发时执行 SQL 语句,将查询结果写入输出表。需要注意的是,在这个过程中,我们需要维护一个 volatile 的布尔型变量 `isRunning`,以便在取消任务时停止源的发射。
完整代码如下:
阅读全文