spark-shell 根据MySQL的shtd_industry库中MachineData表,根据以下要求转换:获取数据后,首先解析列MachineRecordData的数据(数据格式为xml,采用dom4j解析),并获取每条数据的主轴转速,主轴倍率,主轴负载,进给倍率,进给速度,PMC程序号,循环时间,运行时间,有效轴数,总加工个数,已使用内存,未使用内存,可用程序量,注册程序量等相关的值(若该条数据没有相关值,则设置默认值为0.0),同时转换machine_record_state字段的值,若值为报警,则填写1,否则填写0,将数据保存在dwd.fact_machine_learning_data,使用cli按照machine_record_id升序排序,查询dwd.fact_machine_learning_data前1条数据
时间: 2024-03-02 07:48:47 浏览: 148
好的,根据您的要求,我可以给您提供以下的 spark-shell 代码,可以实现您的需求:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.dom4j._
import org.dom4j.io.SAXReader
import scala.util.Try
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("MachineLearningDataConversion")
.getOrCreate()
// 读取 MySQL 中的 MachineData 表
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hostname:port/shtd_industry")
.option("dbtable", "MachineData")
.option("user", "username")
.option("password", "password")
.load()
// 定义 UDF 解析 MachineRecordData 列的 xml 数据
val parseXml = udf((xmlData: String, key: String) => {
Try {
val reader = new SAXReader()
val document = reader.read(new InputSource(new StringReader(xmlData)))
document.getRootElement.element(key).getText.toDouble
}.getOrElse(0.0)
})
// 转换数据集
val transformedDF = jdbcDF.select(
$"MachineRecordId", $"MachineRecordData",
parseXml($"MachineRecordData", lit("MainAxisRotationalSpeed")).as("MainAxisRotationalSpeed"),
parseXml($"MachineRecordData", lit("MainAxisMultiplier")).as("MainAxisMultiplier"),
parseXml($"MachineRecordData", lit("MainAxisLoad")).as("MainAxisLoad"),
parseXml($"MachineRecordData", lit("FeedMultiplier")).as("FeedMultiplier"),
parseXml($"MachineRecordData", lit("FeedSpeed")).as("FeedSpeed"),
parseXml($"MachineRecordData", lit("PMCProgramNo")).as("PMCProgramNo"),
parseXml($"MachineRecordData", lit("CycleTime")).as("CycleTime"),
parseXml($"MachineRecordData", lit("RunTime")).as("RunTime"),
parseXml($"MachineRecordData", lit("EffectiveAxisNum")).as("EffectiveAxisNum"),
parseXml($"MachineRecordData", lit("ProcessedTotalNum")).as("ProcessedTotalNum"),
parseXml($"MachineRecordData", lit("UsedMemory")).as("UsedMemory"),
parseXml($"MachineRecordData", lit("UnusedMemory")).as("UnusedMemory"),
parseXml($"MachineRecordData", lit("ProgramNum")).as("ProgramNum"),
parseXml($"MachineRecordData", lit("RegisteredProgramNum")).as("RegisteredProgramNum"),
when($"MachineRecordState" === "报警", 1).otherwise(0).as("MachineRecordState")
)
.transform(df => df.na.fill(0.0))
// 将转换后的数据保存到 dwd.fact_machine_learning_data 表中
transformedDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hostname:port/dwd")
.option("dbtable", "fact_machine_learning_data")
.option("user", "username")
.option("password", "password")
.mode("overwrite")
.save()
// 查询 dwd.fact_machine_learning_data 前1条数据
val resultDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hostname:port/dwd")
.option("dbtable", "fact_machine_learning_data")
.option("user", "username")
.option("password", "password")
.load()
.orderBy("MachineRecordId")
.limit(1)
resultDF.show()
```
需要注意的是,您需要将代码中的 `hostname`、`port`、`username` 和 `password` 替换为实际的 MySQL 服务器地址、端口、用户名和密码。
阅读全文