Spark-shell 根据dwd库中fact_machine_data表(或MySQL的shtd_industry库中MachineData表),根据以下要求转换:获取最大分区(MySQL不用考虑)的数据后,首先解析列machine_record_data(MySQL中为MachineRecordData)的数据(数据格式为xml,采用dom4j解析,解析demo在客户端/home/ubuntu/Documents目录下),并获取每条数据的主轴转速,主轴倍率,主轴负载,进给倍率,进给速度,PMC程序号,循环时间,运行时间,有效轴数,总加工个数,已使用内存,未使用内存,可用程序量,注册程序量等相关的值(若该条数据没有相关值,则按下表设置默认值),同时转换machine_record_state字段的值,若值为报警,则填写1,否则填写0,以下为表结构,将数据保存在dwd.fact_machine_learning_data,使用cli按照machine_record_id升序排序,查询dwd.fact_machine_learning_data前1条数据
时间: 2024-02-28 13:54:22 浏览: 180
好的,根据您的要求,我可以给您提供如下的Spark-shell代码:
```scala
// 导入必要的库
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.xml._
import org.dom4j.io.SAXReader
// 创建 SparkSession
val spark = SparkSession
.builder()
.appName("Machine Learning Data")
.getOrCreate()
// 设置日志级别
spark.sparkContext.setLogLevel("WARN")
// 读取 fact_machine_data 表
val df = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_industry")
.option("dbtable", "MachineData")
.option("user", "root")
.option("password", "password")
.load()
// 获取最大分区的数据
val maxPartition = df.agg(max("partition")).collect()(0)(0)
val maxPartitionDF = df.filter($"partition" === maxPartition)
// 定义默认值
val defaultValues = Map(
"MainShaftSpeed" -> 0,
"MainShaftRatio" -> 0,
"MainShaftLoad" -> 0,
"FeedRatio" -> 0,
"FeedSpeed" -> 0,
"PMCProgramNumber" -> "",
"CycleTime" -> 0,
"RunTime" -> 0,
"EffectiveAxisNumber" -> 0,
"TotalMachiningCount" -> 0,
"UsedMemory" -> 0,
"UnusedMemory" -> 0,
"AvailableProgramCount" -> 0,
"RegisteredProgramCount" -> 0,
)
// 定义函数,用于解析 XML 数据
def parseXml(xmlString: String): Map[String, Any] = {
// 使用 SAXReader 解析 XML
val reader = new SAXReader()
val document = reader.read(new InputSource(new StringReader(xmlString)))
// 获取根节点
val root = document.getRootElement()
// 解析 XML,返回 Map
Map(
"MainShaftSpeed" -> (root.elementText("MainShaftSpeed") match {
case "" => defaultValues("MainShaftSpeed")
case x => x.toInt
}),
"MainShaftRatio" -> (root.elementText("MainShaftRatio") match {
case "" => defaultValues("MainShaftRatio")
case x => x.toInt
}),
"MainShaftLoad" -> (root.elementText("MainShaftLoad") match {
case "" => defaultValues("MainShaftLoad")
case x => x.toInt
}),
"FeedRatio" -> (root.elementText("FeedRatio") match {
case "" => defaultValues("FeedRatio")
case x => x.toInt
}),
"FeedSpeed" -> (root.elementText("FeedSpeed") match {
case "" => defaultValues("FeedSpeed")
case x => x.toInt
}),
"PMCProgramNumber" -> (root.elementText("PMCProgramNumber") match {
case "" => defaultValues("PMCProgramNumber")
case x => x
}),
"CycleTime" -> (root.elementText("CycleTime") match {
case "" => defaultValues("CycleTime")
case x => x.toInt
}),
"RunTime" -> (root.elementText("RunTime") match {
case "" => defaultValues("RunTime")
case x => x.toInt
}),
"EffectiveAxisNumber" -> (root.elementText("EffectiveAxisNumber") match {
case "" => defaultValues("EffectiveAxisNumber")
case x => x.toInt
}),
"TotalMachiningCount" -> (root.elementText("TotalMachiningCount") match {
case "" => defaultValues("TotalMachiningCount")
case x => x.toInt
}),
"UsedMemory" -> (root.elementText("UsedMemory") match {
case "" => defaultValues("UsedMemory")
case x => x.toInt
}),
"UnusedMemory" -> (root.elementText("UnusedMemory") match {
case "" => defaultValues("UnusedMemory")
case x => x.toInt
}),
"AvailableProgramCount" -> (root.elementText("AvailableProgramCount") match {
case "" => defaultValues("AvailableProgramCount")
case x => x.toInt
}),
"RegisteredProgramCount" -> (root.elementText("RegisteredProgramCount") match {
case "" => defaultValues("RegisteredProgramCount")
case x => x.toInt
})
)
}
// 定义 UDF,用于解析 XML 数据
val parseXmlUDF = udf(parseXml _)
// 定义 UDF,用于转换 machine_record_state 字段的值
val transformStateUDF = udf((state: String) => if (state == "报警") 1 else 0)
// 解析 machine_record_data 字段的数据,并获取每条数据的相关值
val machineLearningDataDF = maxPartitionDF
.withColumn("machine_record_data_map", parseXmlUDF($"machine_record_data"))
.select($"machine_record_id",
$"machine_record_state",
$"machine_record_data_map.*")
.withColumn("machine_record_state", transformStateUDF($"machine_record_state"))
.orderBy($"machine_record_id")
// 将数据保存在 dwd.fact_machine_learning_data 表中
machineLearningDataDF.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/dwd")
.option("dbtable", "fact_machine_learning_data")
.option("user", "root")
.option("password", "password")
.mode("overwrite")
.save()
// 查询 dwd.fact_machine_learning_data 表前1条数据
val queryResultDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/dwd")
.option("dbtable", "fact_machine_learning_data")
.option("user", "root")
.option("password", "password")
.load()
.orderBy($"machine_record_id")
.limit(1)
// 显示查询结果
queryResultDF.show()
```
需要注意的是,在上面的代码中,要求您的 MySQL 数据库中有 `shtd_industry` 和 `dwd` 两个数据库,并且要求您的 MySQL 数据库中已经存在 `MachineData` 和 `fact_machine_learning_data` 两个表。如果您的数据库中没有这些表,那么您需要先创建这些表。
另外,为了使用 dom4j 解析 XML 数据,您需要在客户端安装 dom4j 库。如果您使用的是 Ubuntu 操作系统,您可以使用以下命令安装 dom4j 库:
```bash
sudo apt-get install libdom4j-java
```
最后,如果您发现 Spark-shell 报错了,您可以将错误信息发送给我,让我帮您解决问题。
阅读全文