写一个spark Wordcount代码
时间: 2023-03-10 21:39:45 浏览: 124
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
相关问题
spark wordcount代码详解
下面是一个简单的Spark WordCount程序:
```python
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("WordCount")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs://localhost:9000/input/sample.txt")
words = text_file.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.saveAsTextFile("hdfs://localhost:9000/output/wordcount")
```
程序的功能是统计一个文本文件中每个单词出现的次数,并将结果保存到HDFS上。
下面是代码的详细解释:
首先,我们导入了`SparkConf`和`SparkContext`。这两个类是Spark的核心类,`SparkConf`用于配置Spark应用程序的属性,`SparkContext`用于连接Spark集群。
```python
from pyspark import SparkConf, SparkContext
```
接下来,我们创建了一个`SparkConf`实例,并给应用程序命名为`WordCount`。我们还创建了一个`SparkContext`实例,并将`SparkConf`传递给它。这些代码将初始化Spark应用程序并连接到Spark集群。
```python
conf = SparkConf().setAppName("WordCount")
sc = SparkContext(conf=conf)
```
然后,我们使用`textFile()`方法从HDFS中读取输入文件,并创建一个RDD(弹性分布式数据集)。
```python
text_file = sc.textFile("hdfs://localhost:9000/input/sample.txt")
```
接下来,我们使用`flatMap()`方法将每行文本拆分成单词,并创建一个新的RDD。
```python
words = text_file.flatMap(lambda line: line.split(" "))
```
然后,我们使用`map()`方法将每个单词转换为一个`(单词, 1)`的键值对,并创建一个新的RDD。
```python
word_counts = words.map(lambda word: (word, 1))
```
接下来,我们使用`reduceByKey()`方法对每个单词的计数进行聚合,并创建一个新的RDD。
```python
word_counts = word_counts.reduceByKey(lambda a, b: a + b)
```
最后,我们使用`saveAsTextFile()`方法将结果保存到HDFS上,并指定输出目录。
```python
word_counts.saveAsTextFile("hdfs://localhost:9000/output/wordcount")
```
这就是完整的Spark WordCount程序。
spark wordcount代码 java版
很高兴为您提供帮助!以下是 Java 版本的 Spark Word Count 代码:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
public class JavaSparkWordCount {
public static void main(String[] args) {
// 创建 SparkConf 对象
SparkConf conf = new SparkConf().setAppName("JavaSparkWordCount").setMaster("local");
// 创建 JavaSparkContext 对象
JavaSparkContext sc = new JavaSparkContext(conf);
// 加载文件内容
JavaRDD<String> lines = sc.textFile("file:///path/to/file");
// 转换为单词 RDD
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
// 统计每个单词出现的次数
JavaRDD<Tuple2<String, Integer>> counts = words.map(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
// 输出结果
counts.foreach(tuple -> System.out.println(tuple._1() + ": " + tuple._2()));
// 关闭 JavaSparkContext 对象
sc.stop();
}
}
```
注意,上述代码中的 `file:///path/to/file` 需要替换为你实际的文件路径。
希望这可以解决你的问题。如果你有任何其他问题,欢迎再次联系我。
阅读全文