def meanAge(dataFrame: DataFrame): Double = { dataFrame .select("Age") .na.drop() .agg(round(mean("Age"), 0)) .first() .getDouble(0) }
时间: 2024-03-31 11:37:45 浏览: 41
这是一个 Scala 函数,输入参数为一个 Spark DataFrame,输出为一个 Double 类型的平均年龄值。这个函数首先从 DataFrame 中选择 "Age" 列,然后使用 `na.drop()` 方法删除包含空值的行。接着,它使用 `agg` 方法计算 "Age" 列的平均值,并使用 `round` 函数将结果四舍五入为整数。最后,它使用 `first` 方法获取计算结果的第一行(因为 `agg` 方法的返回结果是一个 DataFrame),并使用 `getDouble(0)` 方法获取该行的第一个值,即平均年龄值。
相关问题
代码如下: import breeze.numerics.round import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{DoubleType, IntegerType} import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.DataFrame object Titanic_c { def main(args: Array[String]) = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName("Titanic_c").setMaster("local[2]") val sc = new SparkContext(conf) val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Titanic") .getOrCreate; val df = spark.read .format("csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("datasets/Titanic_s.csv") import spark.implicits._ df.withColumn("Pclass", df("Pclass").cast(IntegerType)) .withColumn("Survived", df("Survived").cast(IntegerType)) .withColumn("Age", df("Age").cast(DoubleType)) .withColumn("SibSp", df("SibSp").cast(IntegerType)) .withColumn("Parch", df("Parch").cast(IntegerType)) .withColumn("Fare", df("Fare").cast(DoubleType)) val df1 = df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin") val columns = df1.columns val missing_cnt = columns.map(x => df1.select(col(x)).where(col(x).isNull).count) val result_cnt = sc.parallelize(missing_cnt.zip(columns)).toDF("missing_cnt", "column_name") result_cnt.show() import breeze.stats._ def meanAge(dataFrame: DataFrame): Double = { dataFrame .select("Age") .na.drop() .agg(round(mean("Age"), 0)) .first() .getDouble(0) } val df2 = df1 .na.fill(Map( "Age" -> meanAge(df1), "Embarked" -> "S")) val survived_count = df2.groupBy("Survived").count() survived_count.show() survived_count.coalesce(1).write.option("header", "true").csv("datasets/survived_count.csv") } }
我发现问题可能出在 `meanAge` 函数中的 `round(mean("Age"), 0)` 这句代码上。这个代码中的 `mean` 函数是 Spark SQL 中的函数,但是在 `breeze.stats` 包中也有一个名为 `mean` 的函数,这可能导致了混淆。为了解决这个问题,你可以尝试将 `mean` 函数改名为其他的变量名,例如:
```scala
def meanAge(dataFrame: DataFrame): Double = {
val ageMean = dataFrame
.select("Age")
.na.drop()
.agg(round(mean("Age"), 0))
.first()
.getDouble(0)
ageMean
}
```
这样就将 `mean` 函数改名为了 `ageMean` 变量,并避免了与 `breeze.stats` 包中的函数重名的问题。希望对你有帮助!
def analyse(data: DataFrame): DataFrame = { val spark: SparkSession = data.sparkSession import spark.implicits._ logInfo("/**************** 分析各个用户ID的交易金额最大值、最小值、平均值等信息,按照平均值降序排列 ****************/") val save = udf { x: Double => x.formatted("%.2f") } }
好的,你可以将代码修改为以下形式来实现对 DataFrame 的分析操作并返回一个 DataFrame:
```
def analyse(data: DataFrame): DataFrame = {
val spark: SparkSession = data.sparkSession
import spark.implicits._
logInfo("/**************** 分析各个用户ID的交易金额最大值、最小值、平均值等信息,按照平均值降序排列 ****************/")
// 将原始 DataFrame 按用户 ID 进行分组,并且计算每个用户的交易金额总和、最大值、最小值、平均值等信息
val result = data.groupBy("user_id")
.agg(
sum("amount").as("total_amount"),
max("amount").as("max_amount"),
min("amount").as("min_amount"),
avg("amount").as("avg_amount")
)
// 对结果按照平均值降序排列,并将交易金额保留两位小数
result.select($"user_id", $"total_amount", $"max_amount", $"min_amount", save($"avg_amount").as("avg_amount"))
.orderBy($"avg_amount".desc)
}
```
这段代码使用了 Spark SQL 中的 `groupBy`、`agg` 和 `orderBy` 等函数,对原始的 DataFrame 进行了分组、聚合和排序等操作,并且定义了一个 UDF 函数 `save`,用于将交易金额保留两位小数。最后,该代码返回一个按照用户平均交易金额降序排列的 DataFrame。
阅读全文