使用java编写一个flink,实时采集oracle的数据,然后输出到postgres
时间: 2024-05-11 16:15:49 浏览: 237
Flink实时同步Oracle数据到Doris
我明白了,您的要求是使用Java编写一个Flink程序,实时采集Oracle的数据,并将其输出到PostgreSQL。这个可以用Flink的JDBC Connector轻松完成。您需要先下载Flink的JDBC Connector依赖,然后在代码中使用OracleSourceFunction读取Oracle的数据,再使用PostgreSQLSinkFunction将数据写入PostgreSQL。您可以参考以下代码:
```
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.TimestampedSourceFunction;
import org.apache.flink.streaming.connectors.jdbc.JdbcConnectionOptions;
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.connectors.jdbc.JdbcTableSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcTableSource;
import org.apache.flink.streaming.connectors.jdbc.JdbcUpsertTableSink;
import org.apache.flink.streaming.connectors.jdbc.MySQLUpsertTableSink;
import org.apache.flink.streaming.connectors.jdbc.TableUtils;
import org.apache.flink.streaming.connectors.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Optional;
import java.util.Properties;
public class OracleToFlinkToPostgreSQLDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
// read Oracle data
SourceFunction<Row> oracleSource = new OracleSourceFunction("jdbc:oracle:thin:@ip:port:sid","user","password","SELECT * FROM source_table");
// TimestampedSourceFunction<Row> timestampedSource = new OracleSourceFunction("jdbc:oracle:thin:@ip:port:sid","user","password","SELECT event_time, field1, field2 FROM source_table")
// .assignTimestampsAndWatermarks(r -> new Timestamp(r.getTimestamp("event_time").getTime()));
// env.addSource(timestampedSource)
env.addSource(oracleSource)
// transform data
.map(new MapFunction<Row, String>() {
@Override
public String map(Row value) throws Exception {
return value.toString();
}
})
// write PostgreSQL data
.addSink(new PostgresSinkFunction("jdbc:postgresql://ip:port/database","user","password","target_table"));
env.execute("OracleToFlinkToPostgreSQL Demo");
}
private static class OracleSourceFunction extends JdbcTableSource {
public OracleSourceFunction(String url, String user, String password, String query) throws SQLException {
super(JdbcConnectionOptions.builder()
.withUrl(url)
.withUsername(user)
.withPassword(password)
.build(),
Optional.empty(),
query,
TypeInformation.of(Row.class));
}
@Override
public String getTableName() {
return "source_table";
}
}
private static class PostgresSinkFunction implements SinkFunction<String> {
private final String url;
private final String user;
private final String password;
private final String query;
public PostgresSinkFunction(String url, String user, String password, String table) {
this.url = url;
this.user = user;
this.password = password;
this.query = TableUtils.getUpsertStatement(table,
new String[]{"field1", "field2"},
new String[]{"?","?"});
}
@Override
public void invoke(String value) throws Exception {
try (JdbcOutputFormat outputFormat = JdbcOutputFormat.buildJdbcOutputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl(url)
.setUsername(user)
.setPassword(password)
.setQuery(query)
.finish()) {
outputFormat.writeRecord(Row.of(value.split(",",2))); // 需要把逗号分割的值转成两个字段
}
}
}
}
```
此代码演示了如何使用JDBC Connector读取Oracle表中的数据,并将其写入PostgreSQL中的另一个表中。在代码中,您需要将URL、用户名、密码、查询和目标表的名称替换为实际的值。此外,您还需要将MySQL替换为PostgreSQL驱动程序。
阅读全文