package com.twq import org.apache.avro.generic.GenericData.StringType import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.DoubleType import org.apache.spark.{SparkConf, SparkContext} object FlightsDelayPredictor { def main(args: Array[String]): Unit = { //1.Spark应用的配置 val conf = new SparkConf() if (!conf.contains("spark.master")) { conf.setMaster("local[3]").setAppName("FlightsDelayPredictor") } val sc = new SparkContext(conf) sc.setLogLevel("Warn") val spark = SparkSession.builder() .config(conf) .getOrCreate() //提供很多隐式转换功能 //2.加载数据 var flightsDF = spark.read .option("header", "true") .csv("data/2008.csv") .select(col("Year").cast(StringType), //年 col("Month").cast(StringType), //月 col("DayofMonth").cast(StringType), //日 col("DayofWeek").cast(DoubleType), //星期 col("CRSDepTime").cast(StringType), //计划离开时间 col("CRSArrTime").cast(StringType), //预计到达时间 col("UniqueCarrier").cast(StringType), col("CRSElapsedTime").cast(DoubleType), col("ArrDelay").cast(DoubleType), col("DepDelay").cast(DoubleType), col("Origin").cast(StringType), col("Dest").cast(StringType), col("Distance").cast(DoubleType), col("TaxiOut").cast(DoubleType), val flightsDF: DataFrame = spark.read.format("csv").option("header", "true").load("path/to/flights.csv")flightsDF.show() flightsDF.show() ) } }
时间: 2024-02-21 13:56:57 浏览: 142
你的代码存在两个问题:
1. 缺少 `org.apache.spark.sql.functions._` 的 import 语句,导致 `col` 函数无法使用。你需要在代码中加入以下 import 语句:
```scala
import org.apache.spark.sql.functions._
```
2. 变量 `flightsDF` 的定义缺少一个括号。你需要将 `flightsDF` 的定义改为:
```scala
var flightsDF = spark.read
.option("header", "true")
.csv("data/2008.csv")
.select(
col("Year").cast(StringType), //年
col("Month").cast(StringType), //月
col("DayofMonth").cast(StringType), //日
col("DayofWeek").cast(DoubleType), //星期
col("CRSDepTime").cast(StringType), //计划离开时间
col("CRSArrTime").cast(StringType), //预计到达时间
col("UniqueCarrier").cast(StringType),
col("CRSElapsedTime").cast(DoubleType),
col("ArrDelay").cast(DoubleType),
col("DepDelay").cast(DoubleType),
col("Origin").cast(StringType),
col("Dest").cast(StringType),
col("Distance").cast(DoubleType),
col("TaxiOut").cast(DoubleType)
)
```
修改后的代码如下:
```scala
package com.twq
import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
object FlightsDelayPredictor {
def main(args: Array[String]): Unit = {
//1.Spark应用的配置
val conf = new SparkConf()
if (!conf.contains("spark.master")) {
conf.setMaster("local[3]").setAppName("FlightsDelayPredictor")
}
val sc = new SparkContext(conf)
sc.setLogLevel("Warn")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
//提供很多隐式转换功能
//2.加载数据
var flightsDF = spark.read
.option("header", "true")
.csv("data/2008.csv")
.select(
col("Year").cast(StringType), //年
col("Month").cast(StringType), //月
col("DayofMonth").cast(StringType), //日
col("DayofWeek").cast(DoubleType), //星期
col("CRSDepTime").cast(StringType), //计划离开时间
col("CRSArrTime").cast(StringType), //预计到达时间
col("UniqueCarrier").cast(StringType),
col("CRSElapsedTime").cast(DoubleType),
col("ArrDelay").cast(DoubleType),
col("DepDelay").cast(DoubleType),
col("Origin").cast(StringType),
col("Dest").cast(StringType),
col("Distance").cast(DoubleType),
col("TaxiOut").cast(DoubleType)
)
flightsDF.show()
}
}
```
阅读全文