用spark-shell操作hive时只能连接到default数据库
时间: 2023-04-20 07:00:57 浏览: 126
您好,使用spark-shell操作hive时只能连接到default数据库可能是因为您没有指定其他数据库的连接信息。您可以在连接hive时使用以下命令指定其他数据库的连接信息:
```
spark.sql("use your_database_name")
```
其中,your_database_name是您要连接的数据库名称。这样,您就可以在spark-shell中操作其他数据库了。
相关问题
抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.user_info命令
以下是抽取增量数据进入Hudi的代码:
```scala
import org.apache.spark.sql.functions._
import org.apache.hudi.QuickstartUtils._
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false&serverTimezone=UTC"
val dbProperties = new java.util.Properties()
dbProperties.setProperty("user", "root")
dbProperties.setProperty("password", "root")
val user_df = spark.read.jdbc(jdbcUrl, "user_info", dbProperties)
val hudi_options = Map[String, String](
HoodieWriteConfig.TABLE_NAME -> "user_info",
HoodieWriteConfig.RECORDKEY_FIELD_OPT_KEY -> "id",
HoodieWriteConfig.PRECOMBINE_FIELD_OPT_KEY -> "operate_time",
HoodieWriteConfig.PARTITIONPATH_FIELD_OPT_KEY -> "etl_date",
HoodieWriteConfig.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.OPERATION_OPT_KEY -> "upsert",
HoodieWriteConfig.BULK_INSERT_SORT_MODE_OPT_KEY -> "GLOBAL_SORT",
HoodieWriteConfig.BULK_INSERT_INPUT_RECORDS_NUM_OPT_KEY -> "500",
HoodieWriteConfig.BULK_INSERT_PARALLELISM_OPT_KEY -> "2",
HoodieWriteConfig.FORMAT_OPT_KEY -> "org.apache.hudi",
HoodieWriteConfig.HIVE_SYNC_ENABLED_OPT_KEY -> "false",
HoodieWriteConfig.HIVE_DATABASE_OPT_KEY -> "default",
HoodieWriteConfig.HIVE_TABLE_OPT_KEY -> "user_info",
HoodieWriteConfig.HIVE_PARTITION_FIELDS_OPT_KEY -> "etl_date",
HoodieWriteConfig.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> "org.apache.hudi.hive.NonPartitionedExtractor",
HoodieWriteConfig.HOODIE_TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ"
)
val etl_date = java.time.LocalDate.now.minusDays(1).format(java.time.format.DateTimeFormatter.BASIC_ISO_DATE)
val hudi_df = user_df
.withColumn("etl_date", lit(etl_date))
.withColumn("operate_time", coalesce(col("operate_time"), col("create_time")))
.withColumn("operate_time_long", unix_timestamp(col("operate_time"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("create_time_long", unix_timestamp(col("create_time"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("increment_ts", greatest(col("operate_time_long"), col("create_time_long")))
.filter(col("increment_ts") >= unix_timestamp(lit(etl_date), "yyyyMMdd"))
.selectExpr("id", "username", "age", "gender", "create_time", "operate_time")
.repartition(2)
hudi_df.write
.format("org.apache.hudi")
.options(hudi_options)
.mode("append")
.save("hdfs://localhost:9000/user/hive/warehouse/ods_ds_hudi.db/user_info")
```
执行完毕后,可以在Hive中使用`show partitions ods_ds_hudi.user_info`命令查看分区情况。
hive on spark port
Hive on Spark是将Hive SQL查询引擎与Spark计算框架集成起来的一种方式。它通过将Hive中的查询转换为Spark作业来提高查询性能和扩展性。
要在Hive中使用Spark作为计算引擎,需要完成以下步骤:
1. 配置Hive和Spark:需要在Hive配置文件中设置Hive执行引擎为Spark,并配置Spark的相关参数。
2. 启动Spark:需要在Hive启动之前启动Spark,可以使用以下命令启动:
```
$ spark-submit --class org.apache.spark.deploy.ClientApp \
--master yarn \
--deploy-mode client \
--driver-memory 4g \
--executor-memory 2g \
--num-executors 20 \
--executor-cores 2 \
--queue default \
--name "Spark shell" \
/path/to/spark/examples/jars/spark-examples_2.11-2.3.1.jar
```
3. 启动Hive:启动Hive时,需要将Hive执行引擎设置为Spark。可以使用以下命令启动:
```
$ hive --hiveconf hive.execution.engine=spark
```
4. 创建Hive表:在Hive中创建表时,需要将表的存储格式设置为Spark。可以使用以下命令创建表:
```
CREATE TABLE mytable (key INT, value STRING)
STORED AS PARQUET
LOCATION '/user/hive/warehouse/mytable'
TBLPROPERTIES ('spark.sql.sources.provider'='parquet');
```
5. 运行查询:在Hive中运行查询时,查询将被转换为Spark作业。可以使用以下命令运行查询:
```
SELECT * FROM mytable WHERE key = 1;
```
以上是在Hive中使用Spark作为计算引擎的简要介绍,希望能对你有所帮助。
阅读全文