flink将数据保存到Mysql
时间: 2023-09-17 12:13:04 浏览: 94
要将 Flink 中的数据保存到 MySQL 数据库中,需要使用 Flink JDBC 连接器。下面是一个简单的示例代码,演示如何在 Flink 流处理程序中将数据写入 MySQL 数据库:
```java
DataStream<Tuple2<String, Integer>> dataStream = ...; //输入数据流
dataStream.addSink(
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/test")
.setUsername("root")
.setPassword("password")
.setQuery("INSERT INTO my_table (word, count) VALUES (?,?)")
.setBatchInterval(1000) //每1000条数据批量写入一次
.finish()
);
```
这个示例代码假设输入数据流的每个元素都是一个包含单词和计数的二元组 (Tuple2<String, Integer>),并将它们写入名为 my_table 的 MySQL 表中。setQuery() 方法设置 SQL 插入语句,其中的 ? 表示占位符,可以在后续 setParameters() 方法中设置具体的值。
需要注意的是,如果要使用 JDBC 连接器,还需要在 Flink 的 classpath 中添加对相应 JDBC 驱动程序的依赖项。在本例中,需要添加 MySQL JDBC 驱动程序的依赖项。
阅读全文