flink通过flinkcdc使用streamapi方式连接tidb
时间: 2023-09-02 18:14:06 浏览: 175
Apache Flink是一个分布式处理流式数据的开源框架,而TiDB是一个分布式SQL数据库。在Flink中使用FlinkCDC连接TiDB可以使用两种方式:使用FlinkCDC的Table API或使用FlinkCDC的Stream API。
使用FlinkCDC的Stream API连接TiDB的步骤如下:
1. 在TiDB中创建一个测试表并插入一些数据:
```
CREATE DATABASE test;
USE test;
CREATE TABLE user (
id INT PRIMARY KEY,
name VARCHAR(20),
age INT
);
INSERT INTO user VALUES (1, 'Alice', 20), (2, 'Bob', 30);
```
2. 在Flink中引入TiDB和FlinkCDC的相关依赖:
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-tidb_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-connectors_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
```
3. 编写Flink程序,使用FlinkCDC的Stream API连接TiDB:
```
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceRecord;
import org.apache.flink.streaming.api.functions.source.TimestampedSourceFunction;
import org.apache.flink.streaming.connectors.tidb.TableId;
import org.apache.flink.streaming.connectors.tidb.TiDBOptions;
import org.apache.flink.streaming.connectors.tidb.TiDBSource;
import org.apache.flink.streaming.connectors.tidb.TiDBTableSource;
import org.apache.flink.types.Row;
import java.util.Properties;
public class TiDBStreamAPIExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty(TiDBOptions.TIDB_ADDRESS.key(), "localhost:4000");
properties.setProperty(TiDBOptions.TIDB_PASSWORD.key(), "");
properties.setProperty(TiDBOptions.TIDB_USER.key(), "root");
TiDBTableSource tableSource = TiDBSource
.builder()
.setDatabaseName("test")
.setTableName("user")
.setProperties(properties)
.build();
env.addSource(tableSource).print();
env.execute("TiDB Stream API Example");
}
}
```
4. 运行程序,可以看到从TiDB中读取到的数据:
```
1,Alice,20
2,Bob,30
```
以上就是使用FlinkCDC的Stream API连接TiDB的步骤。
阅读全文