添加pom文件实现Flink sink MySQl
时间: 2023-03-29 11:02:38 浏览: 182
可以通过在 Maven 项目的 pom.xml 文件中添加 Flink 的 MySQL Connector 依赖来实现 Flink sink MySQL。具体的依赖信息如下:
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.2</version>
</dependency>
```
在 Flink 程序中,可以通过创建一个 JdbcSink 来将数据写入到 MySQL 数据库中。具体的代码如下:
```
DataStream<Tuple2<String, Integer>> dataStream = ...;
JdbcSink sink = JdbcSink.sink(
"INSERT INTO my_table (name, count) VALUES (?, ?)",
(ps, t) -> {
ps.setString(1, t.f);
ps.setInt(2, t.f1);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/my_database")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("my_username")
.withPassword("my_password")
.build()
);
dataStream.addSink(sink);
```
以上代码中,dataStream 是一个包含 Tuple2<String, Integer> 类型数据的 Flink 数据流,JdbcSink.sink() 方法用于创建一个 JdbcSink,其中第一个参数是 SQL 语句,第二个参数是将 Tuple2 转换为 PreparedStatement 的函数,第三个参数是数据库连接信息。最后,通过 addSink() 方法将数据流写入到 MySQL 数据库中。
阅读全文