使用idea用spark抽取ods库sku_info表中的数据到dwd库dim_sku_info表中,字段为etl_date的值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用mysql cli查询表dim_sku_info的字段id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date条件为id大于等于15且小于等于20,并且按照id升序排序
时间: 2023-08-11 19:06:07 浏览: 232
好的,这是一个关于Spark和MySQL的数据处理问题。以下是解决方案:
1. 使用Spark SQL抽取ods库中的sku_info表中的数据到DataFrame中,例如:
```scala
val skuInfoDF = spark.sql("SELECT * FROM ods.sku_info")
```
2. 对DataFrame进行数据清洗和转换,添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,并将etl_date的值与ods库的相对应表该值相等,例如:
```scala
import org.apache.spark.sql.functions._
val dimSkuInfoDF = skuInfoDF
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_insert_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.withColumn("dwd_modify_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.withColumnRenamed("sku_desc", "sku_name") // 修改列名
.withColumn("etl_date", date_format(lit("2021-01-01"), "yyyyMMdd")) // etl_date的值与ods库的相对应表该值相等
```
3. 将DataFrame中的数据写入MySQL的dim_sku_info表中,例如:
```scala
dimSkuInfoDF.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/dwd")
.option("dbtable", "dim_sku_info")
.option("user", "root")
.option("password", "123456")
.mode(SaveMode.Append)
.save()
```
4. 最后,使用MySQL的CLI查询表dim_sku_info的字段id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date条件为id大于等于15且小于等于20,并且按照id升序排序,例如:
```sql
SELECT id, sku_name, dwd_insert_user, dwd_modify_time, etl_date
FROM dim_sku_info
WHERE id >= 15 AND id <= 20
ORDER BY id ASC;
```
以上就是解决方案,希望能够帮助到你!
阅读全文