spark-shell 根据dwd库中fact_machine_data表(或MySQL的shtd_industry库中MachineData表),根据以下要求转换:获取最大分区(MySQL不用考虑)的数据后,首先解析列machine_record_data(MySQL中为MachineRecordData)的数据(数据格式为xml,采用dom4j解析,解析demo在客户端/home/ubuntu/Documents目录下),并获取每条数据的主轴转速,主轴倍率,主轴负载,进给倍率,进给速度,PMC程序号,循环时间,运行时间,有效轴数,总加工个数,已使用内存,未使用内存,可用程序量,注册程序量等相关的值(若该条数据没有相关值,则按下表设置默认值),同时转换machine_record_state字段的值,若值为报警,则填写1,否则填写0,以下为表结构,将数据保存在dwd.fact_machine_learning_data,使用cli按照machine_record_id升序排序,查询dwd.fact_machine_learning_data前1条数据
时间: 2023-10-27 17:07:01 浏览: 315
在`spark-shell`中,可以按照以下步骤完成上述要求:
1. 读取MySQL中的`MachineData`表或者dwd库中的`fact_machine_data`表,获取最大分区的数据,并将`machine_record_data`列作为字符串列读入。
```scala
import org.apache.spark.sql.functions._
val machineData = spark.read.jdbc(url, "MachineData", connectionProperties)
.select(col("machine_record_id"), col("machine_record_data").cast("string"), col("machine_record_state"))
.orderBy(col("machine_record_id").asc)
.limit(1)
```
2. 使用`dom4j`解析`machine_record_data`列中的XML数据,并获取需要的数据项,同时转换`machine_record_state`字段的值。
```scala
import org.dom4j.DocumentHelper
val parsedData = machineData.map(row => {
val xml = DocumentHelper.parseText(row.getAs[String]("machine_record_data"))
val root = xml.getRootElement()
val mainSpindleSpeed = root.elementText("MainSpindleSpeed") match {
case null => 0
case value => value.toInt
}
val mainSpindleMultiplier = root.elementText("MainSpindleMultiplier") match {
case null => 0
case value => value.toInt
}
val mainSpindleLoad = root.elementText("MainSpindleLoad") match {
case null => 0
case value => value.toInt
}
val feedMultiplier = root.elementText("FeedMultiplier") match {
case null => 0
case value => value.toInt
}
val feedSpeed = root.elementText("FeedSpeed") match {
case null => 0
case value => value.toInt
}
val pmcProgramNumber = root.elementText("PmcProgramNumber") match {
case null => ""
case value => value
}
val cycleTime = root.elementText("CycleTime") match {
case null => 0
case value => value.toInt
}
val runTime = root.elementText("RunTime") match {
case null => 0
case value => value.toInt
}
val effectiveAxisCount = root.elementText("EffectiveAxisCount") match {
case null => 0
case value => value.toInt
}
val totalMachinedParts = root.elementText("TotalMachinedParts") match {
case null => 0
case value => value.toInt
}
val usedMemory = root.elementText("UsedMemory") match {
case null => 0
case value => value.toInt
}
val freeMemory = root.elementText("FreeMemory") match {
case null => 0
case value => value.toInt
}
val availableProgramCount = root.elementText("AvailableProgramCount") match {
case null => 0
case value => value.toInt
}
val registeredProgramCount = root.elementText("RegisteredProgramCount") match {
case null => 0
case value => value.toInt
}
val machineRecordState = row.getAs[String]("machine_record_state") match {
case "ALARM" => 1
case _ => 0
}
(row.getAs[Int]("machine_record_id"), mainSpindleSpeed, mainSpindleMultiplier,
mainSpindleLoad, feedMultiplier, feedSpeed, pmcProgramNumber, cycleTime,
runTime, effectiveAxisCount, totalMachinedParts, usedMemory, freeMemory,
availableProgramCount, registeredProgramCount, machineRecordState)
}).toDF("machine_record_id", "main_spindle_speed", "main_spindle_multiplier",
"main_spindle_load", "feed_multiplier", "feed_speed", "pmc_program_number",
"cycle_time", "run_time", "effective_axis_count", "total_machined_parts",
"used_memory", "free_memory", "available_program_count",
"registered_program_count", "machine_record_state")
```
3. 将转换后的数据保存到`dwd.fact_machine_learning_data`表中。
```scala
parsedData.write.mode("append").jdbc(url, "fact_machine_learning_data", connectionProperties)
```
4. 使用CLI查询`dwd.fact_machine_learning_data`前1条数据。
```sql
SELECT * FROM dwd.fact_machine_learning_data ORDER BY machine_record_id ASC LIMIT 1;
```
阅读全文