编写Scala代码 用flink的TABLEAPI将json文件 "msg": "操作成功","code":200, "data":[{"machine":"q1232","showFactChangeRecordList":[{"changeID":112}]}] 解析出来
时间: 2023-12-30 16:03:05 浏览: 88
以下是使用Scala代码和Flink的TABLE API将JSON文件解析的示例:
```scala
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
val json = """
{
"msg": "操作成功",
"code": 200,
"data": [
{
"machine": "q1232",
"showFactChangeRecordList": [
{
"changeID": 112
}
]
}
]
}
"""
val jsonInput = env.fromElements(json)
val jsonTable = tableEnv.fromDataSet(jsonInput)
val resultTable = jsonTable
.select("data")
.flatMap(row => {
val jsonArray = row.getField(0).asInstanceOf[Array[Row]]
jsonArray.map(_.getField(0).asInstanceOf[Row])
})
.select("machine, showFactChangeRecordList.changeID")
resultTable.printSchema()
resultTable.toDataSet[Row].print()
```
阅读全文