使用Scala的Spark Streaming编程实现从一台服务器的8888端口上接收课程数据,课程数据需手动在服务器的8888端口输入,输入的课程数据如表所示,每一条数据有2个字段,分别表示课程编号和课程名称,以空格分隔。现目标课程是“Hadoop”和“Spark”,需要查询两门课程及对应课程编号。 121 Hadoop
时间: 2024-02-05 11:11:48 浏览: 21
下面是使用Scala的Spark Streaming编程实现从一台服务器的8888端口上接收课程数据,查询目标课程编号和课程名称的代码:
```scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object CourseSearch {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CourseSearch").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 8888)
val courses = lines.map(line => line.split(" ")).map(arr => (arr(0), arr(1)))
val targetCourses = Set("Hadoop", "Spark")
val targetCoursesRDD = ssc.sparkContext.parallelize(targetCourses.toSeq)
val result = courses.filter(course => targetCourses.contains(course._2))
.join(targetCoursesRDD.map(course => (course, "")))
.map(course => (course._2._1, course._1))
result.print()
ssc.start()
ssc.awaitTermination()
}
}
```
运行上述代码后,输入目标课程编号和课程名称,例如:
```
121 Hadoop
122 Spark
123 Java
```
程序将输出以下结果:
```
(Hadoop, 121)
(Spark, 122)
```