flink cdc postgresql
时间: 2023-09-03 10:06:44 浏览: 247
在使用Flink进行CDC(变更数据捕获)时,可以通过创建与PostgreSQL的映射表来实现数据的读取和写入。首先,在Flink SQL中创建与PostgreSQL的映射表,其中source表用于读取数据,sink表用于写入数据。创建源表的示例代码如下:
CREATE TABLE cdc_pg_source (
id INT,
age INT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '192.168.1.88',
'port' = '5432',
'database-name' = 'postgres',
'schema-name' = 'public',
'username' = 'postgres',
'password' = 'postgres',
'table-name' = 'cdc_pg_source',
'decoding.plugin.name' = 'pgoutput',
'debezium.slot.name' = 'cdc_pg_source'
);
创建目标表的示例代码如下:
CREATE TABLE cdc_pg_sink (
id INT,
age INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.1.88:5432/postgres',
'username' = 'postgres',
'password' = 'postgres',
'table-name' = 'cdc_pg_sink',
'sink.buffer-flush.max-rows' = '1'
);
然后,可以将数据从source表插入到sink表中,以完成数据的写入操作。示例代码如下:
INSERT INTO cdc_pg_sink SELECT * FROM cdc_pg_source;
如果需要重新加载配置或重启PostgreSQL,可以使用以下命令:# docker exec -it 7b2d8a96ef4c /bin/bash
root@7b2d8a96ef4c:/# su postgres
postgres@7b2d8a96ef4c:/$ pg_ctl reload server signaled
希望以上信息对您有所帮助。<span class="em">1</span><span class="em">2</span><span class="em">3</span>
#### 引用[.reference_title]
- *1* *2* [flink cdc 连接 postgresql kafka](https://blog.csdn.net/hzyice/article/details/115484832)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v93^chatsearchT3_2"}}] [.reference_item style="max-width: 50%"]
- *3* [Flink Postgres CDC](https://blog.csdn.net/hyunbar/article/details/126002491)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v93^chatsearchT3_2"}}] [.reference_item style="max-width: 50%"]
[ .reference_list ]
阅读全文