请使用自定义分区器,将sc,parallelize(1 to 100,5)重新分为四组,能够被2、3、5整除的各分为一组,不能整除的一组,并将结果保存在HDFS中,保存路径为/FinalExam/parti_RDD。
时间: 2024-02-25 20:52:32 浏览: 57
n, int m) {
FILE *fp;
fp = fopen("students.txt", "w");
if (fp == NULL) {
printf("文件打开失败!");
return;
}
fprintf(fp, "%d %d\n", n, m);
好的,下面是使用自定义分区器将RDD重新分区,并将结果保存到HDFS的Scala代码:
``` for (int i = 0; i < n; i++) {
fprintf(fp, "%d %s ", students[i].id,scala
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
// 自定义分区器
class MyPartitioner students[i].name);
for (int j = 0; j < m; j++) {
fprintf(fp, "%d ", students(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any[i].score[j]);
}
fprintf(fp, "\n");
}
fclose(fp);
}
// 从文件中读取学): Int = {
val k = key.asInstanceOf[Int]
if (k % 2 == 0 && k % 3 == 0 && k % 5 == 0) {
0
} else {
1
}
}
override生信息
void read_students_from_file(Student *students, int *n, int *m) {
FILE *fp;
fp = fopen("students.txt", "r");
if (fp == NULL) {
printf("文件打开失败!");
return;
def equals(other: Any): Boolean = other match {
case myPartitioner: MyPartitioner =>
myPartitioner.numPartitions == }
fscanf(fp, "%d %d", n, m);
for (int i = 0; i < *n; i numPartitions
case _ =>
false
}
}
// 创建RDD
val rdd: RDD[Int] = sc.parallelize(1 to 100, 5)
// 使用自定义分区器重新分区
val partitionedRDD = rdd.partitionBy++) {
fscanf(fp, "%d %s", &students[i].id, students[i].name);
for (int j = 0; j < *m; j++) {
fscanf(fp, "%d", &students[i].score[j]);
students[i].total(new MyPartitioner(2))
// 将结果保存到HDFS
partitionedRDD.saveAsTextFile("/FinalExam/parti_RDD_score += students[i].score[j];
}
students[i].avg_score = students[i].total_score / (float)(*m);
")
```
这段代码中,我们定义了一个自定义的分区器`MyPartitioner`,它将能够被 }
fclose(fp);
}
```
这个程序实现了您所要求的所有功能,您可以根据需要进行修改2、3、5整除的数分为一组,不能整除的数分为一组,共分为两组。同时,程序也进行了异常处理,可以检查用户输入数据的有效性,在用户输入数据有错误或无效。然后我们创建了一个RDD,使用`parallelize`方法将1到100的数据分成5个分区。接时,不会中断程序的执行,程序具有一定的健壮性。
阅读全文