spark-shell 根据MySQL的shtd_industry库中MachineData表,根据以下要求转换:获取数据后,首先解析列MachineRecordData的数据(数据格式为xml,采用dom4j解析),并获取每条数据的主轴转速,主轴倍率,主轴负载,进给倍率,进给速度,PMC程序号,循环时间,运行时间,有效轴数,总加工个数,已使用内存,未使用内存,可用程序量,注册程序量等相关的值(若该条数据没有相关值,则设置默认值为0.0),同时转换machine_record_state字段的值,若值为报警,则填写1,否则填写0,将数据保存在dwd.fact_machine_learning_data,使用cli按照machine_record_id升序排序,查询dwd.fact_machine_learning_data前1条数据
时间: 2024-03-01 14:53:21 浏览: 31
好的,根据你的要求,可以编写以下代码实现:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.dom4j.DocumentHelper
val spark = SparkSession.builder()
.appName("MachineDataTransform")
.enableHiveSupport()
.getOrCreate()
// 读取MySQL中的MachineData表
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_industry")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "MachineData")
.option("user", "root")
.option("password", "root")
.load()
// 定义UDF,用于解析xml数据并获取相关值
val parseXml = udf((xmlStr: String, tagName: String) => {
try {
val doc = DocumentHelper.parseText(xmlStr)
val element = doc.getRootElement.element(tagName)
if (element != null) element.getText.toDouble else 0.0
} catch {
case _: Throwable => 0.0
}
})
// 对MachineRecordData列进行解析并获取相关值
val parsedDF = jdbcDF.withColumn("main_spindle_speed", parseXml(col("MachineRecordData"), lit("main_spindle_speed")))
.withColumn("main_spindle_rate", parseXml(col("MachineRecordData"), lit("main_spindle_rate")))
.withColumn("main_spindle_load", parseXml(col("MachineRecordData"), lit("main_spindle_load")))
.withColumn("feed_rate_rate", parseXml(col("MachineRecordData"), lit("feed_rate_rate")))
.withColumn("feed_rate", parseXml(col("MachineRecordData"), lit("feed_rate")))
.withColumn("pmc_program_no", parseXml(col("MachineRecordData"), lit("pmc_program_no")))
.withColumn("cycle_time", parseXml(col("MachineRecordData"), lit("cycle_time")))
.withColumn("run_time", parseXml(col("MachineRecordData"), lit("run_time")))
.withColumn("effective_axis_count", parseXml(col("MachineRecordData"), lit("effective_axis_count")))
.withColumn("total_machining_count", parseXml(col("MachineRecordData"), lit("total_machining_count")))
.withColumn("used_memory", parseXml(col("MachineRecordData"), lit("used_memory")))
.withColumn("unused_memory", parseXml(col("MachineRecordData"), lit("unused_memory")))
.withColumn("available_program_count", parseXml(col("MachineRecordData"), lit("available_program_count")))
.withColumn("registered_program_count", parseXml(col("MachineRecordData"), lit("registered_program_count")))
.withColumn("machine_record_state", when(col("machine_record_state") === "报警", 1).otherwise(0))
// 保存数据到dwd.fact_machine_learning_data表中
parsedDF.select("machine_record_id", "main_spindle_speed", "main_spindle_rate", "main_spindle_load", "feed_rate_rate",
"feed_rate", "pmc_program_no", "cycle_time", "run_time", "effective_axis_count", "total_machining_count",
"used_memory", "unused_memory", "available_program_count", "registered_program_count", "machine_record_state")
.write.mode("overwrite").insertInto("dwd.fact_machine_learning_data")
// 查询dwd.fact_machine_learning_data前1条数据
spark.sql("SELECT * FROM dwd.fact_machine_learning_data ORDER BY machine_record_id ASC LIMIT 1").show()
```
希望这个代码对你有帮助!