flink 输出到mysql demo
时间: 2023-07-18 21:39:51 浏览: 98
使用flink-connector-sqlserver-cdc 2.3.0把数据从SQL Server实时同步到MySQL中
5星 · 资源好评率100%
好的,我可以为您提供一个简单的 Flink 输出到 MySQL 的示例代码。首先,您需要在 Flink 项目中引入 MySQL 驱动程序的依赖项。这可以通过在 pom.xml 文件中添加以下代码来完成:
```
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
```
接下来,您可以使用 Flink 的 JDBCOutputFormat 类将数据写入 MySQL 数据库。以下是一个示例代码:
```
// 导入必要的类
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// 创建 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建一个包含数据的 DataStream
DataStream<Tuple2<String, Integer>> dataStream = ...
// 配置 MySQL 连接信息
String username = "your-username";
String password = "your-password";
String driverClassName = "com.mysql.jdbc.Driver";
String dbUrl = "jdbc:mysql://localhost:3306/your-database-name";
// 配置要写入的表信息
String tableName = "your-table-name";
String[] fieldNames = {"name", "count"};
JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(driverClassName)
.setDBUrl(dbUrl)
.setUsername(username)
.setPassword(password)
.setQuery("INSERT INTO " + tableName + " (name, count) VALUES (?, ?)")
.setSqlTypes(new int[]{Types.VARCHAR, Types.INTEGER})
.finish();
// 将数据写入 MySQL
dataStream.addSink(jdbcOutputFormat);
// 执行 Flink 任务
env.execute("Flink MySQL Demo");
```
这段代码将数据流中的每个 Tuple2<String, Integer> 对象插入到 MySQL 数据库中的指定表中。您可以根据需要修改此代码以符合您的数据格式和数据库结构。希望能帮助到您!
阅读全文