2、 根据Hive的dwd库中相关表或MySQL中shtd_store中相关商品表(sku_info),spark-shell获取id、spu_id、price、weight、tm_id、category3_id 这六个字段并进行数据预处理,对price、weight进行规范化(StandardScaler)处理,对spu_id、tm_id、category3_id进行one-hot编码处理(若该商品属于该品牌则置为1,否则置为0),并按照id进行升序排序,在集群中输出第一条数据前10列(无需展示字段名)
时间: 2023-11-27 19:49:16 浏览: 640
以下是基于Scala语言的代码:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.{StandardScaler, OneHotEncoder, StringIndexer}
// 读取Hive中的dwd库中相关表
val df = spark.sql("select id, spu_id, price, weight, tm_id, category3_id from dwd_table")
// 读取MySQL中shtd_store中相关商品表
// val df = spark.read.jdbc(url, table, properties)
// 规范化price和weight字段
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaled_features")
.setWithStd(true)
.setWithMean(true)
val assembler = VectorAssembler()
.setInputCols(Array("price", "weight"))
.setOutputCol("features")
val scaledDF = scaler.fit(assembler.transform(df)).transform(assembler.transform(df))
// 对spu_id、tm_id、category3_id进行one-hot编码
val spuIndexer = new StringIndexer()
.setInputCol("spu_id")
.setOutputCol("spu_index")
.setHandleInvalid("skip")
val tmIndexer = new StringIndexer()
.setInputCol("tm_id")
.setOutputCol("tm_index")
.setHandleInvalid("skip")
val categoryIndexer = new StringIndexer()
.setInputCol("category3_id")
.setOutputCol("category_index")
.setHandleInvalid("skip")
val encoder = new OneHotEncoder()
.setInputCols(Array("spu_index", "tm_index", "category_index"))
.setOutputCols(Array("spu_encoded", "tm_encoded", "category_encoded"))
val oneHotDF = encoder.fit(categoryIndexer.fit(tmIndexer.fit(spuIndexer.fit(scaledDF).transform(scaledDF)).transform(scaledDF)).transform(scaledDF)
// 按照id进行升序排序,并输出前10行
val resultDF = oneHotDF.sort("id").limit(10).select("id", "spu_encoded", "tm_encoded", "category_encoded", "scaled_features")
resultDF.show(false)
```
其中,需要替换以下内容:
1. `dwd_table`:Hive中的相关表名;
2. `url`、`table`、`properties`:MySQL中相关表的连接信息;
3. `Array("price", "weight")`:需要规范化的字段名;
4. `Array("spu_index", "tm_index", "category_index")`:需要进行one-hot编码的字段名;
5. `"skip"`:空值处理策略,可选值为`"skip"`、`"error"`和`"keep"`,表示跳过、报错或保留空值。
阅读全文