使用Scala的Spark Streaming编程实现从一台服务器的8888端口上接收课程数据,课程数据需手动在服务器的8888端口输入,输入的课程数据如表所示,每一条数据有2个字段,分别表示课程编号和课程名称,以空格分隔。现目标课程是“Hadoop”和“Spark”,
时间: 2024-02-05 17:11:35 浏览: 59
spark-streaming-scala-example:使用Kafka和Cassandra的Spark Streaming作业示例
以下是使用Scala的Spark Streaming编程实现从一台服务器的8888端口上接收课程数据的代码:
```scala
import org.apache.spark._
import org.apache.spark.streaming._
object CourseStreaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("CourseStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 8888)
val courses = lines.map(line => line.split(" ")(1))
val hadoop = courses.filter(course => course == "Hadoop")
val spark = courses.filter(course => course == "Spark")
hadoop.print()
spark.print()
ssc.start()
ssc.awaitTermination()
}
}
```
代码解释:
1. 创建SparkConf对象,设置应用程序名称为“CourseStreaming”。
2. 创建StreamingContext对象,设置批处理间隔为1秒。
3. 从socket端口8888上读取数据,创建DStream对象。
4. 对每条数据进行处理,提取出课程名称。
5. 对课程名称进行过滤,得到目标课程“Hadoop”和“Spark”。
6. 对目标课程进行输出打印。
7. 启动StreamingContext。
8. 阻塞当前线程,等待程序终止。
阅读全文