scala的Spark Streaming编程实现从一台服务器的8888端口上接收课程数据,课程数据需手动在服务器的8888端口输入,输入的课程数据如表所示,每一条数据有2个字段,分别表示课程编号和课程名称,以空格分隔。
时间: 2024-02-09 16:09:08 浏览: 116
好的,首先需要在Scala中导入Spark Streaming相关的依赖包:
```scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
```
然后创建一个SparkConf对象,并设置相关参数:
```scala
val conf = new SparkConf().setAppName("Spark Streaming Course Data").setMaster("local[*]")
```
其中`setAppName`用于设置应用程序的名称,`setMaster`用于设置Spark的运行模式,这里设置为本地模式。
接着创建一个StreamingContext对象,设置每5秒接收一次数据:
```scala
val ssc = new StreamingContext(conf, Seconds(5))
```
然后使用`socketTextStream`方法创建一个DStream对象,用于接收来自服务器8888端口的数据:
```scala
val lines = ssc.socketTextStream("localhost", 8888)
```
接下来需要对接收到的数据进行处理。首先将每条数据按照空格分隔为课程编号和课程名称:
```scala
val courses = lines.map(line => {
val fields = line.split(" ")
(fields(0), fields(1))
})
```
然后对课程数据进行一些操作,比如统计每个课程出现的次数:
```scala
val courseCounts = courses.countByValue()
```
最后将结果打印出来:
```scala
courseCounts.print()
```
完整代码如下:
```scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object CourseData {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark Streaming Course Data").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 8888)
val courses = lines.map(line => {
val fields = line.split(" ")
(fields(0), fields(1))
})
val courseCounts = courses.countByValue()
courseCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
```
在程序运行后,可以手动在服务器的8888端口输入课程数据,程序将会实时统计每个课程出现的次数。
阅读全文