spark-shell根据子任务一的结果,计算出与用户id为6708的用户所购买相同商品种类最多的前10位用户id(只考虑他俩购买过多少种相同的商品,不考虑相同的商品买了多少次),并根据Hudi(第一题为hive)的dwd库中相关表或MySQL数据库shtd_store中相关表,获取到这10位用户已购买过的商品,并剔除用户6708已购买的商品,通过计算这10位用户已购买的商品(剔除用户6708已购买的商品)与用户6708已购买的商品数据集中商品的余弦相似度累加再求均值,输出均值前5商品id作为推荐使用
时间: 2023-11-27 09:53:58 浏览: 141
假设子任务一的结果已经保存在了Hudi的dwd库中的某个表中,下面给出Spark-Shell的代码示例:
```scala
// 导入必要的库
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.ml.linalg.{SparseVector, Vector}
import org.apache.spark.sql.DataFrame
// 读取子任务一的结果
val subtask1DF = spark.read.table("dwd.subtask1_result")
// 筛选出与用户id为6708购买过相同商品的前10位用户id(不考虑相同商品购买次数)
val top10UserDF = subtask1DF.filter($"user_id" =!= 6708)
.groupBy($"user_id")
.agg(countDistinct($"product_id").as("common_count"))
.orderBy($"common_count".desc)
.limit(10)
// 获取这10位用户已购买过的商品
val productsDF = spark.read.jdbc(
url = "jdbc:mysql://localhost:3306/shtd_store",
table = "products",
properties = Map("user" -> "username", "password" -> "password")
)
val top10ProductsDF = top10UserDF.join(subtask1DF, Seq("user_id"))
.select($"product_id")
.distinct()
val user6708ProductsDF = subtask1DF.filter($"user_id" === 6708)
.select($"product_id")
.distinct()
val recommendedProductsDF = top10ProductsDF.join(productsDF, Seq("product_id"))
.except(user6708ProductsDF.join(productsDF, Seq("product_id")))
// 计算这10位用户已购买的商品与用户6708已购买的商品数据集中商品的余弦相似度
val hashingTF = new HashingTF().setInputCol("product_id").setOutputCol("rawFeatures")
val featurizedDF = subtask1DF.groupBy($"user_id").agg(collect_list($"product_id").as("product_list"))
.select($"user_id", hashingTF($"product_list").as("rawFeatures"))
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedDF)
val rescaledDF = idfModel.transform(featurizedDF).select($"user_id", $"features".as("user_features"))
val user6708Features = rescaledDF.filter($"user_id" === 6708).select($"user_features").head.getAs[SparseVector](0)
val dotProductUDF = udf((v1: Vector, v2: Vector) => v1.dot(v2))
val cosineSimilarityUDF = udf((v1: Vector, v2: Vector) => v1.dot(v2) / (v1.norm(2) * v2.norm(2)))
val similarityDF = rescaledDF.filter($"user_id" =!= 6708)
.withColumn("similarity", cosineSimilarityUDF($"user_features", lit(user6708Features)))
.select($"user_id", $"similarity")
// 获取均值前5的商品id
val recommendedProducts = recommendedProductsDF.select($"product_id")
.collect()
.map(_.getString(0))
val top5Products = subtask1DF.filter($"product_id".isin(recommendedProducts:_*))
.groupBy($"product_id")
.agg(sum($"quantity").as("total_quantity"))
.orderBy($"total_quantity".desc)
.limit(5)
.select($"product_id")
.collect()
.map(_.getString(0))
```
上述代码中,我们先读取了子任务一的结果,并筛选出与用户id为6708购买过相同商品的前10位用户id。然后通过Hudi的dwd库中的products表或MySQL数据库shtd_store中的products表获取这10位用户已购买过的商品,并剔除用户6708已购买的商品。接着,我们将用户购买的商品列表转化为TF-IDF向量,并计算余弦相似度,得到每个用户与用户6708的相似度。最后,我们根据相似度计算出均值前5的商品id作为推荐使用。