spark wordcount代码详解
时间: 2023-11-05 17:07:36 浏览: 56
下面是一个简单的Spark WordCount程序:
```python
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("WordCount")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs://localhost:9000/input/sample.txt")
words = text_file.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.saveAsTextFile("hdfs://localhost:9000/output/wordcount")
```
程序的功能是统计一个文本文件中每个单词出现的次数,并将结果保存到HDFS上。
下面是代码的详细解释:
首先,我们导入了`SparkConf`和`SparkContext`。这两个类是Spark的核心类,`SparkConf`用于配置Spark应用程序的属性,`SparkContext`用于连接Spark集群。
```python
from pyspark import SparkConf, SparkContext
```
接下来,我们创建了一个`SparkConf`实例,并给应用程序命名为`WordCount`。我们还创建了一个`SparkContext`实例,并将`SparkConf`传递给它。这些代码将初始化Spark应用程序并连接到Spark集群。
```python
conf = SparkConf().setAppName("WordCount")
sc = SparkContext(conf=conf)
```
然后,我们使用`textFile()`方法从HDFS中读取输入文件,并创建一个RDD(弹性分布式数据集)。
```python
text_file = sc.textFile("hdfs://localhost:9000/input/sample.txt")
```
接下来,我们使用`flatMap()`方法将每行文本拆分成单词,并创建一个新的RDD。
```python
words = text_file.flatMap(lambda line: line.split(" "))
```
然后,我们使用`map()`方法将每个单词转换为一个`(单词, 1)`的键值对,并创建一个新的RDD。
```python
word_counts = words.map(lambda word: (word, 1))
```
接下来,我们使用`reduceByKey()`方法对每个单词的计数进行聚合,并创建一个新的RDD。
```python
word_counts = word_counts.reduceByKey(lambda a, b: a + b)
```
最后,我们使用`saveAsTextFile()`方法将结果保存到HDFS上,并指定输出目录。
```python
word_counts.saveAsTextFile("hdfs://localhost:9000/output/wordcount")
```
这就是完整的Spark WordCount程序。