某高校为大数据相关专业的学生开设了多门课程,为了能够实时地查找出目标课程,需要在IntelliJ IDEA中使用Spark Streaming编程实现从一台服务器的8888端口上接收课程数据,课程数据需手动在服务器的8888端口输入,输入的课程数据如表所示,每一条数据有2个字段,分别表示课程编号和课程名称,以空格分隔。现目标课程是“Hadoop”和“Spark”,需要查询两门课程及对应课程编号。 121 Hadoop
时间: 2023-12-10 15:04:47 浏览: 24
这个问题可以通过以下代码实现:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class CourseSearch {
public static void main(String[] args) throws InterruptedException {
// 创建SparkStreaming上下文
SparkConf conf = new SparkConf().setAppName("CourseSearch").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// 从服务器的8888端口上接收数据
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 8888);
// 将每行数据切分成课程编号和课程名称
JavaDStream<List<String>> courses = lines.map(line -> Arrays.asList(line.split(" ")));
// 过滤出目标课程
JavaDStream<List<String>> targetCourses = courses.filter(course -> {
String courseName = course.get(1);
return courseName.equals("Hadoop") || courseName.equals("Spark");
});
// 将目标课程转换为(key, value)对
JavaPairDStream<String, String> coursePairs = targetCourses.mapToPair(course -> new Tuple2<>(course.get(1), course.get(0)));
// 输出结果
coursePairs.print();
// 启动SparkStreaming应用
jssc.start();
jssc.awaitTermination();
}
}
```
在输入数据方面,可以在命令行中使用`nc`命令模拟输入数据。首先打开一个终端窗口,输入以下命令:
```bash
nc -lk 8888
```
然后在新的终端窗口中输入课程数据,例如:
```bash
121 Hadoop
122 Spark
123 Hive
124 HBase
```
程序会输出以下结果:
```
-------------------------------------------
Time: 1622769220000 ms
-------------------------------------------
(Hadoop,121)
(Spark,122)
```