flink写入 mysql_flink sql实时计算当天pv写入mysql
时间: 2023-10-14 13:19:09 浏览: 152
要将Flink实时计算的结果写入MySQL数据库中,可以使用Flink的JDBC连接器。以下是一个将当天PV写入MySQL的示例代码:
```java
// 定义MySQL连接信息
final String jdbcUrl = "jdbc:mysql://localhost:3306/test";
final String username = "root";
final String password = "root";
// 定义Flink数据流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据流
DataStream<String> input = env.socketTextStream("localhost", 9999);
// 对数据流进行处理,计算当天PV
DataStream<Tuple2<String, Integer>> result = input
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
// 解析日志数据,获取访问时间和页面URL
String[] fields = value.split(",");
String time = fields[0];
String url = fields[1];
// 计算当天日期
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String today = sdf.format(new Date());
// 判断日志时间是否在当天,如果是则返回页面URL和1,否则返回空
if (time.contains(today)) {
return new Tuple2<>(url, 1);
} else {
return null;
}
}
})
.filter(Objects::nonNull)
.keyBy(0)
.sum(1);
// 将结果写入MySQL
result.addSink(JdbcSink.sink(
"insert into pv(url, pv, date) values (?, ?, ?)",
new JdbcStatementBuilder<Tuple2<String, Integer>>() {
@Override
public void accept(PreparedStatement preparedStatement, Tuple2<String, Integer> t) throws SQLException {
// 设置SQL参数
preparedStatement.setString(1, t.f0);
preparedStatement.setInt(2, t.f1);
preparedStatement.setDate(3, new Date(System.currentTimeMillis()));
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(jdbcUrl)
.withUsername(username)
.withPassword(password)
.build()));
// 启动数据流处理
env.execute();
```
该代码首先定义了MySQL的连接信息,然后创建了一个数据流环境,并从Socket读取数据流进行处理。处理过程中,首先解析日志数据,判断是否在当天,并计算PV。最后,使用JdbcSink将结果写入MySQL数据库中。
需要注意的是,JdbcSink需要引入以下依赖:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.12.2</version>
</dependency>
```
阅读全文