flink cdc Java程序获取oracle通过dataGuard产生的备库的数据的代码
时间: 2024-03-11 11:51:05 浏览: 166
以下是使用 Flink CDC 获取 Oracle 数据库备库数据的 Java 代码示例:
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;
import java.util.Properties;
public class OracleDataGuardCDC {
private static final String KAFKA_TOPIC = "test_topic";
private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
private static final String ORACLE_HOST = "localhost";
private static final String ORACLE_PORT = "1521";
private static final String ORACLE_SID = "ORCL";
private static final String ORACLE_USERNAME = "username";
private static final String ORACLE_PASSWORD = "password";
private static final String ORACLE_TABLE_NAME = "test_table";
private static final String ORACLE_SCHEMA_NAME = "test_schema";
public static void main(String[] args) throws Exception {
EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
kafkaProps.setProperty("group.id", "test_group");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps);
DebeziumDebeziumSourceBuilder oracleSourceBuilder = DebeziumDebeziumSourceBuilder.create()
.fromOracleCdc(ORACLE_HOST, ORACLE_PORT, ORACLE_SID, ORACLE_USERNAME, ORACLE_PASSWORD)
.withSchemaIncludeList(ORACLE_SCHEMA_NAME)
.table(ORACLE_SCHEMA_NAME, ORACLE_TABLE_NAME);
tableEnv.executeSql("CREATE TABLE test_table (id INT, name STRING) WITH ('connector' = 'kafka', 'topic' = 'test_topic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'test_group', 'format' = 'json')");
tableEnv.executeSql(oracleSourceBuilder.build().asSummaryString());
tableEnv.executeSql("INSERT INTO test_table SELECT id, name FROM debezium_source");
DataStream<Row> result = env.addSource(kafkaConsumer).map(new MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
return Row.of(value);
}
});
result.print();
env.execute("Oracle DataGuard CDC");
}
}
```
这段代码使用 Flink CDC 获取 Oracle 数据库备库数据,并将数据写入 Kafka 中。其中,DebeziumDebeziumSourceBuilder 是一个自定义的类,用于构建 Flink CDC 的 Source。需要根据实际情况修改相关参数。
阅读全文