用flink sql实现mysql同步到kafka
时间: 2023-11-18 21:06:38 浏览: 191
1. 环境准备
- 安装 MySQL,创建测试数据库和表,并插入数据
- 安装 Kafka,并创建一个 topic
- 安装 Flink
2. 创建 Flink 项目
- 在 Flink 的 bin 目录下执行 flink new myflinkproject 创建一个新的 Flink 项目
- 在 pom.xml 中添加以下依赖
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
```
- 在 src/main/java 下创建一个 Java 类,例如 SyncMySQLToKafka.java
3. 编写 Flink SQL
在 SyncMySQLToKafka.java 中编写以下代码:
```
public class SyncMySQLToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE mysql_table (id INT, name STRING) " +
"WITH (" +
" 'connector.type' = 'jdbc'," +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test?characterEncoding=utf-8'," +
" 'connector.table' = 'test_table'," +
" 'connector.driver' = 'com.mysql.jdbc.Driver'," +
" 'connector.username' = 'root'," +
" 'connector.password' = 'root'" +
")");
tableEnv.executeSql("CREATE TABLE kafka_table (id INT, name STRING) " +
"WITH (" +
" 'connector.type' = 'kafka'," +
" 'connector.version' = 'universal'," +
" 'connector.topic' = 'test_topic'," +
" 'connector.properties.bootstrap.servers' = 'localhost:9092'," +
" 'connector.properties.group.id' = 'test_group'," +
" 'format.type' = 'json'," +
" 'update-mode' = 'append'" +
")");
tableEnv.executeSql("INSERT INTO kafka_table SELECT id, name FROM mysql_table");
env.execute();
}
}
```
- 创建一个 MySQL 表 mysql_table,指定连接信息和表名
- 创建一个 Kafka 表 kafka_table,指定连接信息、topic 和数据格式
- 将 mysql_table 中的数据插入到 kafka_table 中
4. 运行程序
- 在命令行中进入项目根目录,执行 mvn clean package 编译项目
- 执行以下命令运行程序
```
./bin/flink run -c SyncMySQLToKafka target/myflinkproject-1.0-SNAPSHOT.jar
```
5. 验证结果
- 在 Kafka 中查看是否有数据写入到 test_topic 中
- 修改 MySQL 表中的数据,查看是否能同步到 Kafka 中
以上就是使用 Flink SQL 实现 MySQL 同步到 Kafka 的简单示例。需要注意的是,本示例仅供参考,实际应用中需要根据具体需求进行修改和优化。
阅读全文