import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._object CheckDuplicateAttributes { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("CheckDuplicateAttributes") .master("local[*]") .getOrCreate() val inputPath = "hdfs://localhost:9000/platform_data" val outputPath = "hdfs://localhost:9000/platform_data/output" val df = spark.read.format("csv") .option("header", "true") .option("inferSchema", "true") .load(inputPath) val datePattern = "yyyy-MM-dd" val dfFormatted = df.select( date_format(col("申请时间"), datePattern).as("申请日期"), date_format(col("创建时间"), datePattern).as("创建日期"), date_format(col("created_at"), datePattern).as("创建日期2"), date_format(col("updated_at"), datePattern).as("更新日期") ) val count1 = dfFormatted.filter(col("申请日期") === col("创建日期")).count() val count2 = dfFormatted.filter(col("创建日期2") === col("更新日期")).count() println(s"两组属性同时相等的数据条数为${count1}条") println(s"两组属性同时相等的数据条数为${count2}条") spark.stop() }}
时间: 2023-07-21 16:51:31 浏览: 87
这段代码是一个检查重复属性的 Spark 应用程序。它从指定的路径读取 CSV 文件,然后选择其中的四个属性,并将它们格式化为日期格式。接着,它分别计算了两组属性同时相等的数据条数,并将结果打印输出。
如果需要改进这段代码,有以下几点可以考虑:
1. 文件路径硬编码:应该将文件路径作为参数传入程序,而不是硬编码在代码中。
2. 日志输出:应该使用日志框架来输出日志信息,而不是直接使用 println 函数。
3. 变量命名:应该使用更加具有描述性的变量名,使得代码更易读。
4. 可读性:应该更加注重代码的可读性,比如使用空格、缩进等。
5. 异常处理:应该添加异常处理机制,以避免出现意外情况导致程序崩溃。
6. 可扩展性:应该将代码设计为可扩展的,以方便后续的修改和维护。
以上是一些常见的代码改进点,具体可以根据实际情况进行调整。
相关问题
代码如下: import breeze.numerics.round import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{DoubleType, IntegerType} import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.DataFrame object Titanic_c { def main(args: Array[String]) = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName("Titanic_c").setMaster("local[2]") val sc = new SparkContext(conf) val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Titanic") .getOrCreate; val df = spark.read .format("csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("datasets/Titanic_s.csv") import spark.implicits._ df.withColumn("Pclass", df("Pclass").cast(IntegerType)) .withColumn("Survived", df("Survived").cast(IntegerType)) .withColumn("Age", df("Age").cast(DoubleType)) .withColumn("SibSp", df("SibSp").cast(IntegerType)) .withColumn("Parch", df("Parch").cast(IntegerType)) .withColumn("Fare", df("Fare").cast(DoubleType)) val df1 = df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin") val columns = df1.columns val missing_cnt = columns.map(x => df1.select(col(x)).where(col(x).isNull).count) val result_cnt = sc.parallelize(missing_cnt.zip(columns)).toDF("missing_cnt", "column_name") result_cnt.show() import breeze.stats._ def meanAge(dataFrame: DataFrame): Double = { dataFrame .select("Age") .na.drop() .agg(round(mean("Age"), 0)) .first() .getDouble(0) } val df2 = df1 .na.fill(Map( "Age" -> meanAge(df1), "Embarked" -> "S")) val survived_count = df2.groupBy("Survived").count() survived_count.show() survived_count.coalesce(1).write.option("header", "true").csv("datasets/survived_count.csv") } }
我发现问题可能出在 `meanAge` 函数中的 `round(mean("Age"), 0)` 这句代码上。这个代码中的 `mean` 函数是 Spark SQL 中的函数,但是在 `breeze.stats` 包中也有一个名为 `mean` 的函数,这可能导致了混淆。为了解决这个问题,你可以尝试将 `mean` 函数改名为其他的变量名,例如:
```scala
def meanAge(dataFrame: DataFrame): Double = {
val ageMean = dataFrame
.select("Age")
.na.drop()
.agg(round(mean("Age"), 0))
.first()
.getDouble(0)
ageMean
}
```
这样就将 `mean` 函数改名为了 `ageMean` 变量,并避免了与 `breeze.stats` 包中的函数重名的问题。希望对你有帮助!
import breeze.numerics.round import breeze.stats.mean import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{DoubleType, IntegerType} import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.DataFrame object Titanic_c { def main(args: Array[String]) = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName("Titanic_c").setMaster("local[2]") val sc = new SparkContext(conf) val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Titanic") .getOrCreate; val df = spark.read .format("csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("datasets/Titanic_s.csv") import spark.implicits._ df.withColumn("Pclass", df("Pclass").cast(IntegerType)) .withColumn("Survived", df("Survived").cast(IntegerType)) .withColumn("Age", df("Age").cast(DoubleType)) .withColumn("SibSp", df("SibSp").cast(IntegerType)) .withColumn("Parch", df("Parch").cast(IntegerType)) .withColumn("Fare", df("Fare").cast(DoubleType)) val df1 = df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin") val columns = df1.columns val missing_cnt = columns.map(x => df1.select(col(x)).where(col(x).isNull).count) val result_cnt = sc.parallelize(missing_cnt.zip(columns)).toDF("missing_cnt", "column_name") result_cnt.show() def meanAge(dataFrame: DataFrame): Double = { dataFrame .select("Age") .na.drop() .agg(round(mean("Age"), )) .first() .getDouble(0) } val df2 = df1 .na.fill(Map( "Age" -> meanAge(df1), "Embarked" -> "S")) val survived_count = df2.groupBy("Survived").count() survived_count.show() survived_count.coalesce(1).write.option("header", "true").csv("datasets/survived_count.csv") } }
这是一个使用Spark读取Titanic数据集并对其进行预处理的Scala代码。这个代码将CSV文件读取为一个DataFrame,然后对其中的缺失值进行处理,并计算了生还和死亡人数的统计信息,最后将结果写入CSV文件。
首先,代码创建了一个SparkConf和SparkContext对象,然后创建了一个SparkSession对象,用于读取CSV文件。接下来,代码对数据集中的一些列进行转换,将它们转换为正确的数据类型。然后,代码删除了一些不需要的列。接着,代码计算了每列中缺失值的数量,并将结果存储在一个DataFrame中。接下来,代码定义了一个函数meanAge,用于计算年龄的平均值,并将缺失值替换为该平均值。最后,代码计算了生还和死亡人数的统计信息,并将结果写入CSV文件。
希望这能帮到你!
阅读全文