使用Scala的Spark Streaming编程实现从一台服务器的8888端口上接收课程数据,课程数据需手动在服务器的8888端口输入,输入的课程数据如表所示,每一条数据有2个字段,分别表示课程编号和课程名称,以空格分隔。现目标课程是“Hadoop”和“Spark”,需要查询两门课程及对应课程编号。 121 Hadoop
时间: 2024-02-05 19:11:18 浏览: 77
Spark-Streaming编程指南.docx
下面是Scala的Spark Streaming代码实现:
```scala
import org.apache.spark._
import org.apache.spark.streaming._
object CourseStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("CourseStreaming")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 8888)
val courses = lines.map(line => line.split(" ")).map(fields => (fields(1), fields(0)))
val hadoopCourse = courses.filter(course => course._1 == "Hadoop").map(course => course._2)
val sparkCourse = courses.filter(course => course._1 == "Spark").map(course => course._2)
hadoopCourse.print()
sparkCourse.print()
ssc.start()
ssc.awaitTermination()
}
}
```
这个代码会从本地的8888端口上接收数据,并且将每一行数据按照空格分隔成两个字段,然后将课程名称作为键,课程编号作为值,构成一个二元组。接着,我们可以使用`filter`方法过滤出目标课程,然后使用`map`方法将课程编号提取出来。最后,我们将结果打印出来即可。
执行这个代码后,我们需要手动在服务器的8888端口输入课程数据,例如:
```
121 Hadoop
122 Spark
123 Storm
```
如果其中有Hadoop和Spark,那么对应的课程编号会被打印出来。
阅读全文