用java语言编写spark读取csv文件的代码
时间: 2023-10-06 14:07:05 浏览: 113
spark部署和基础代码的编写
下面是一个简单的 Java 代码示例,使用 Spark 读取 CSV 文件:
```java
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
public class SparkCSVReader {
public static void main(String[] args) {
// 创建 SparkSession
SparkSession spark = SparkSession.builder()
.appName("SparkCSVReader")
.master("local[*]")
.getOrCreate();
// 创建 SparkContext
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// 定义 CSV 文件的结构
List<StructField> fields = Arrays.asList(
DataTypes.createStructField("id", DataTypes.LongType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true),
DataTypes.createStructField("gender", DataTypes.StringType, true)
);
StructType schema = DataTypes.createStructType(fields);
// 读取 CSV 文件为 DataFrame
Dataset<Row> df = spark.read()
.option("header", "true")
.option("delimiter", ",")
.schema(schema)
.csv("path/to/csv/file.csv");
// 转换为 JavaRDD
JavaRDD<Row> rdd = df.toJavaRDD();
// 输出结果
rdd.foreach(System.out::println);
// 关闭 SparkContext 和 SparkSession
sc.close();
spark.close();
}
}
```
这个代码示例使用 SparkSession 创建了一个本地模式的 SparkContext,然后定义了 CSV 文件的结构,并使用 Spark SQL 读取 CSV 文件为 DataFrame,最后将 DataFrame 转换为 JavaRDD 并输出结果。注意需要在 pom.xml 中添加 Spark 依赖。
阅读全文