spark-core源码学习记录 2 应用提交submit 及driver注册流程,以javawordcount为例
时间: 2023-05-02 16:02:29 浏览: 192
当你提交Spark Core源码学习笔记的时候,并注册Driver的流程,以Java的WordCount为例。
首先,编写提交Job的代码:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("WordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
counts.saveAsTextFile(args[1]);
}
}
```
之后就是提交任务和注册Driver:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkContext;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("WordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
SparkContext spark = SparkContext.getOrCreate(conf);
spark.addSparkListener(new MySparkListener());
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
counts.saveAsTextFile(args[1]);
}
}
class MySparkListener extends SparkListener {
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
String appName = applicationStart.appName();
System.out.println("Application started: " + appName);
}
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
long time = applicationEnd.time();
System.out.println("Application ended: " + time);
}
}
```
这个代码会在提交任务的时候,自动注册Driver,并且添加了自定义的Listener。注意,在提交任务前,需要先启动Spark集群,并将提交路径和结果路径传递给代码的args数组。
阅读全文