我在用flink 版 sedona ,连接postgis 和oracle 转 postgis 代码怎么写
时间: 2024-05-14 16:14:45 浏览: 90
oracle-to-postgres
以下是连接PostGIS和Oracle转PostGIS的示例代码:
```scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.sedona.core.enums.{FileDataSplitter, GridType}
import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
import org.apache.sedona.core.spatialRDD.LineStringRDD
import org.apache.sedona.core.spatialOperator.JoinQuery
import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.KryoSerializerInstance
import org.datasyslab.geospark.enums.{FileDataSplitter, GridType}
import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader
import org.datasyslab.geospark.spatialRDD.LineStringRDD
import org.datasyslab.geospark.spatialOperator.JoinQuery
import org.datasyslab.geosparkviz.core.Serde.SedonaVizKryoRegistrator
import org.datasyslab.geosparkviz.extension.visualizationEffect.{ChoroplethMap, ScatterPlot}
import org.datasyslab.geosparkviz.sql.utils.{Adapter, GeoSparkVizRegistrator}
import org.datasyslab.geosparkviz.utils.{ColorizeOption, ImageType}
import org.datasyslab.geosparkviz.{MapConfig, RendererType}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.geotools.data.{DataStoreFinder, Query}
import org.geotools.jdbc.JDBCDataStore
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.opengis.filter.Filter
object SedonaPostGISOracle {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder()
.master("local[*]")
.appName("SedonaPostGISOracle")
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName)
.config("spark.kryoserializer.buffer.max", "512m")
.config("spark.driver.maxResultSize", "4g")
.getOrCreate()
GeoSparkVizRegistrator.registerAll(sparkSession)
val postGISConnectionProperties = new java.util.HashMap[String, String]()
postGISConnectionProperties.put("user", "postgres")
postGISConnectionProperties.put("password", "postgres")
postGISConnectionProperties.put("driver", "org.postgresql.Driver")
postGISConnectionProperties.put("url", "jdbc:postgresql://localhost:5432/postgis")
val oracleConnectionProperties = new java.util.HashMap[String, String]()
oracleConnectionProperties.put("user", "oracle")
oracleConnectionProperties.put("password", "oracle")
oracleConnectionProperties.put("driver", "oracle.jdbc.driver.OracleDriver")
oracleConnectionProperties.put("url", "jdbc:oracle:thin:@localhost:1521:ORCL")
oracleConnectionProperties.put("dbtable", "LINESTRINGTABLE")
val postGISQuery = "(select st_astext(geom) as wkt from public.linestringtable) as linestringtable"
val postGISLineStringDF = sparkSession.read.jdbc(postGISConnectionProperties.get("url"), postGISQuery, postGISConnectionProperties)
postGISLineStringDF.createOrReplaceTempView("postGISLineStringTable")
val oracleLineStringDF = sparkSession.read.jdbc(oracleConnectionProperties.get("url"), oracleConnectionProperties.get("dbtable"), oracleConnectionProperties)
oracleLineStringDF.createOrReplaceTempView("oracleLineStringTable")
val postGISLineStringRDD = ShapefileReader.readToLineStringRDD(postGISLineStringDF.rdd, true, true)
val oracleLineStringRDD = ShapefileReader.readToLineStringRDD(oracleLineStringDF.rdd, true, true)
val joinResultPairRDD = JoinQuery.SpatialJoinQueryFlat(postGISLineStringRDD, oracleLineStringRDD, false, false)
val joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession)
joinResultDf.show()
sparkSession.stop()
}
}
```
需要注意的是,需要将代码中的连接信息替换为你自己的连接信息。
阅读全文